php editor Xiaoxin is here to introduce to you the Apache Beam left connection in the Go language. Apache Beam is a distributed data processing framework that provides a common programming model for executing batch and stream processing tasks on different distributed data processing engines. The left join is a common data processing operation. It can associate two data sets according to a certain key and return all the records in the left data set and the matching records in the right data set. This article will introduce in detail how to use Apache Beam to perform left join operations in Go language.
Is there a simple way to use go to perform a left join of 2 pcollections? I found that sql connection is only available in java.
package main import ( "context" "flag" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) type customer struct { CustID int FName string } type order struct { OrderID int Amount int Cust_ID int } func main() { flag.Parse() beam.Init() ctx := context.Background() p := beam.NewPipeline() s := p.Root() var custList = []customer{ {1, "Bob"}, {2, "Adam"}, {3, "John"}, {4, "Ben"}, {5, "Jose"}, {6, "Bryan"}, {7, "Kim"}, {8, "Tim"}, } var orderList = []order{ {123, 100, 1}, {125, 30, 3}, {128, 50, 7}, } custPCol := beam.CreateList(s, custList) orderPCol := beam.CreateList(s, orderList) // Left Join custPcol with orderPCol // Expected Result // CustID | FName |OrderID| Amount // 1 | Bob | 123 | 100 // 2 | Adam | | // 3 | John | 125 | 100 // 4 | Ben | | // 5 | Jose | | // 6 | Bryan | | // 7 | Kim | 125 | 100 // 8 | Tim | | if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } }
I want to join these 2 pcollections and perform further operations. I saw the documentation for cogroupbykey but I can't convert it into a format that normal sql join can do.
Any suggestions on this?
Try this
type resulttype struct { custid int fname string orderid int amount int } result := beam.pardo(s, func(c customer, iterorder func(*order) bool) resulttype { var o order for iterorder(&o) { if c.custid == o.cust_id { return resulttype{ custid: c.custid, fname: c.fname, orderid: o.orderid, amount: o.amount, } } } return resulttype{ custid: c.custid, fname: c.fname, } }, custpcol, beam.sideinput{input: orderpcol})
Or if you want to use cogroupbykey...
custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) { return c.CustID, c }, custPCol) orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) { return o.Cust_ID, o }, orderPCol) resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol) beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) { c, o := customer{}, order{} for custIter(&c) { if ok := orderIter(&o); ok { fmt.Println(CustID, c.FName, o.OrderID, o.Amount) } fmt.Println(CustID, c.FName) } }, resultPCol)
The above is the detailed content of Apache Beam left join in Go. For more information, please follow other related articles on the PHP Chinese website!