I am a python developer but should use go to make data flow pipeline. I can't find that many apache beam examples using go compared to python or java.
I have the following code which has a structure for username and age. The task is to add age and then filter based on age. I found a way to increase the age but am stuck on the filtering part.
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func init() { beam.registerfunction(incrementage) } type user struct { name string age int } func printrow(ctx context.context, list user) { fmt.println(list) } func incrementage(list user) user { list.age++ return list } func main() { flag.parse() beam.init() ctx := context.background() p := beam.newpipeline() s := p.root() var userlist = []user{ {"bob", 40}, {"adam", 50}, {"john", 35}, {"ben", 8}, } initial := beam.createlist(s, userlist) pc := beam.pardo(s, incrementage, initial) pc1 := beam.pardo(s, func(row user, emit func(user)) { emit(row) }, pc) beam.pardo0(s, printrow, pc1) if err := beamx.run(ctx, p); err != nil { log.exitf(ctx, "failed to execute job: %v", err) } }
I tried creating a function like below, but this returns a boolean value instead of the user object. I know I'm missing something simple but can't figure it out.
func filterage(list user) user { return list.age > 40 }
In python, I can write a function like below.
beam.Filter(lambda line: line["Age"] >= 40))
You need to add a launcher in the function to launch the user:
func filterAge(list user, emit func(user)) { if list.Age > 40 { emit(list) } }
As written in your current code, returns list.age > 40
list.age > 40
First evaluates to true (a Boolean value) and returns that Boolean value.
The above is the detailed content of Apache Beam ParDo filter in Go. For more information, please follow other related articles on the PHP Chinese website!