Apache Beam ist ein Open-Source-Framework für die verteilte Datenverarbeitung, das ein einheitliches Programmiermodell bereitstellt, das auf verschiedenen Batch- und Stream-Verarbeitungs-Engines ausgeführt werden kann. Kürzlich wurde dem Go SDK von Apache Beam eine sehr nützliche Funktion hinzugefügt – die Auswahl der ersten N Zeilen aus einer PCollection. Diese Funktion ist sehr hilfreich für Szenarien, in denen große Datensätze erfasst oder schnell in der Vorschau angezeigt werden müssen. In diesem Artikel erläutern wir die Verwendung dieser Funktion im Go SDK von Apache Beam und zeigen einige praktische Beispielcodes. Lasst uns beginnen!
Ich habe eine Sammlung, aus der ich die n größten Zeilen auswählen muss. Ich versuche mit go eine Datenflusspipeline zu erstellen und stecke fest.
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) type user struct { name string age int } func printrow(ctx context.context, list user) { fmt.println(list) } func main() { flag.parse() beam.init() ctx := context.background() p := beam.newpipeline() s := p.root() var userlist = []user{ {"bob", 5}, {"adam", 8}, {"john", 3}, {"ben", 1}, {"jose", 1}, {"bryan", 1}, {"kim", 1}, {"tim", 1}, } initial := beam.createlist(s, userlist) pc2 := beam.pardo(s, func(row user, emit func(user)) { emit(row) }, initial) beam.pardo0(s, printrow, pc2) if err := beamx.run(ctx, p); err != nil { log.exitf(ctx, "failed to execute job: %v", err) } }
Aus dem obigen Code muss ich die ersten 5 Zeilen basierend auf user.age auswählen Ich habe den Link oben im Paket gefunden, der die gleiche Funktionalität hat, aber besagt, dass er eine einzelne Element-Pcollection zurückgibt. Was ist der Unterschied?
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func init() { beam.RegisterFunction(less) } type User struct { Name string Age int } func printRow(ctx context.Context, list User) { fmt.Println(list) } func less(a, b User) bool { return a.Age < b.Age } func main() { flag.Parse() beam.Init() ctx := context.Background() p := beam.NewPipeline() s := p.Root() var userList = []User{ {"Bob", 5}, {"Adam", 8}, {"John", 3}, {"Ben", 1}, {"Jose", 1}, {"Bryan", 1}, {"Kim", 1}, {"Tim", 1}, } initial := beam.CreateList(s, userList) best := top.Largest(s, initial, 5, less) pc2 := beam.ParDo(s, func(row User, emit func(User)) { emit(row) }, best) beam.ParDo0(s, printRow, pc2) if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } }
Ich habe die Funktion zum Auswählen der ersten 5 Zeilen wie oben hinzugefügt, aber ich habe eine Fehlermeldung erhalten []main.user is not allocate to main.user
Ich benötige die pcollection im gleichen Format wie zuvor, da ich sie weiter verarbeiten muss. Ich vermute, das liegt daran, dass die Funktion top.largest eine einzelne Element-Pcollection zurückgibt. Irgendwelche Ideen, wie man das Format konvertieren kann?
Die beste pcollection ist []user
Also probieren Sie es aus...
pc2 := beam.ParDo(s, func(rows []User, emit func(User)) { for _, row := range rows { emit(row) } }, best)
Das obige ist der detaillierte Inhalt vonApache Beam wählt die obersten N Zeilen aus PCollection in Go aus. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!