Apache Beam est un framework de traitement de données distribué open source qui fournit un modèle de programmation unifié pouvant s'exécuter sur différents moteurs de traitement par lots et par flux. Récemment, une fonctionnalité très utile a été ajoutée au SDK Go d'Apache Beam : la sélection des N premières lignes d'une PCollection. Cette fonctionnalité est très utile pour les scénarios dans lesquels de grands ensembles de données doivent être échantillonnés ou prévisualisés rapidement. Dans cet article, nous expliquerons comment utiliser cette fonctionnalité dans le SDK Go d'Apache Beam et montrerons des exemples de code pratiques. commençons!
J'ai une pcollection dans laquelle je dois sélectionner les n plus grandes lignes. J'essaie de créer un pipeline de flux de données en utilisant Go et je suis bloqué.
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) type user struct { name string age int } func printrow(ctx context.context, list user) { fmt.println(list) } func main() { flag.parse() beam.init() ctx := context.background() p := beam.newpipeline() s := p.root() var userlist = []user{ {"bob", 5}, {"adam", 8}, {"john", 3}, {"ben", 1}, {"jose", 1}, {"bryan", 1}, {"kim", 1}, {"tim", 1}, } initial := beam.createlist(s, userlist) pc2 := beam.pardo(s, func(row user, emit func(user)) { emit(row) }, initial) beam.pardo0(s, printrow, pc2) if err := beamx.run(ctx, p); err != nil { log.exitf(ctx, "failed to execute job: %v", err) } }
À partir du code ci-dessus, je dois sélectionner les 5 premières lignes en fonction de user.age J'ai trouvé le lien en haut du package qui a la même fonctionnalité mais il indique qu'il renvoie un seul élément pcollection. Quelle est la différence?
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func init() { beam.RegisterFunction(less) } type User struct { Name string Age int } func printRow(ctx context.Context, list User) { fmt.Println(list) } func less(a, b User) bool { return a.Age < b.Age } func main() { flag.Parse() beam.Init() ctx := context.Background() p := beam.NewPipeline() s := p.Root() var userList = []User{ {"Bob", 5}, {"Adam", 8}, {"John", 3}, {"Ben", 1}, {"Jose", 1}, {"Bryan", 1}, {"Kim", 1}, {"Tim", 1}, } initial := beam.CreateList(s, userList) best := top.Largest(s, initial, 5, less) pc2 := beam.ParDo(s, func(row User, emit func(User)) { emit(row) }, best) beam.ParDo0(s, printRow, pc2) if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } }
J'ai ajouté la fonction pour sélectionner les 5 premières lignes comme ci-dessus mais j'ai eu une erreur []main.user is not allocate to main.user
J'ai besoin de la collection dans le même format qu'avant car je dois la traiter davantage. Je soupçonne que c'est parce que la fonction top.largest renvoie un seul élément pcollection. Des idées sur la façon de convertir le format ?
La meilleure collection est []user
Alors essayez-le...
pc2 := beam.ParDo(s, func(rows []User, emit func(User)) { for _, row := range rows { emit(row) } }, best)
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!