首頁 > 後端開發 > Golang > Apache Beam 從 Go 中的 PCollection 中選擇前 N 行

Apache Beam 從 Go 中的 PCollection 中選擇前 N 行

PHPz
發布: 2024-02-10 17:48:08
轉載
573 人瀏覽過

Apache Beam 从 Go 中的 PCollection 中选择前 N 行

Apache Beam 是一個開源的分散式資料處理框架,它提供了一個統一的程式設計模型,可以在不同的批次和串流處理引擎上運行。最近,Apache Beam 的 Go SDK 中新增了一個非常有用的功能——從 PCollection 中選擇前 N 行。這個功能對於需要對大型資料集進行取樣或快速預覽的場景非常有幫助。在本文中,我們將介紹如何在 Apache Beam 的 Go SDK 中使用這個功能,並展示一些實際的範例程式碼。讓我們開始吧!

問題內容

我有一個 pcollection,我需要從中選擇 n 個最大的行。我正在嘗試使用 go 創建資料流管道並陷入困境。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 5},
        {"adam", 8},
        {"john", 3},
        {"ben", 1},
        {"jose", 1},
        {"bryan", 1},
        {"kim", 1},
        {"tim", 1},
    }
    initial := beam.createlist(s, userlist)

    pc2 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, initial)

    beam.pardo0(s, printrow, pc2)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}
登入後複製

從上面的程式碼中,我需要根據 user.age 選擇前 5 行 我發現鏈接頂部包具有相同的功能,但它說它返回單個元素 pcollection。有什麼不同?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}
登入後複製

我像上面一樣加入了選擇前 5 行的函數,但出現錯誤 []main.user is not allocate to main.user

我需要與以前相同格式的 pcollection,因為我需要進一步處理。我懷疑這是因為 top.largest 函數傳回單一元素 pcollection。關於如何轉換格式有什麼想法嗎?

解決方法

最好的 pcollection 是 []user

所以嘗試一下...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)
登入後複製

以上是Apache Beam 從 Go 中的 PCollection 中選擇前 N 行的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:stackoverflow.com
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板