Rumah > pembangunan bahagian belakang > Golang > Apache Beam pilih baris N teratas daripada PCollection dalam Go

Apache Beam pilih baris N teratas daripada PCollection dalam Go

PHPz
Lepaskan: 2024-02-10 17:48:08
ke hadapan
720 orang telah melayarinya

Apache Beam 从 Go 中的 PCollection 中选择前 N 行

Apache Beam 是一个开源的分布式数据处理框架,它提供了一种统一的编程模型,可以在不同的批处理和流处理引擎上运行。最近,Apache Beam 的 Go SDK 中新增了一个非常有用的功能——从 PCollection 中选择前 N 行。这个功能对于需要对大型数据集进行采样或者快速预览的场景非常有帮助。在本文中,我们将介绍如何在 Apache Beam 的 Go SDK 中使用这个功能,并展示一些实际的示例代码。让我们开始吧!

问题内容

我有一个 pcollection,我需要从中选择 n 个最大的行。我正在尝试使用 go 创建一个数据流管道并陷入困境。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 5},
        {"adam", 8},
        {"john", 3},
        {"ben", 1},
        {"jose", 1},
        {"bryan", 1},
        {"kim", 1},
        {"tim", 1},
    }
    initial := beam.createlist(s, userlist)

    pc2 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, initial)

    beam.pardo0(s, printrow, pc2)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}
Salin selepas log masuk

从上面的代码中,我需要根据 user.age 选择前 5 行 我发现链接顶部包具有相同的功能,但它说它返回单个元素 pcollection。有什么不同?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}
Salin selepas log masuk

我像上面一样添加了选择前 5 行的函数,但出现错误 []main.user is not allocate to main.user

我需要与以前相同格式的 pcollection,因为我需要进一步处理。我怀疑这是因为 top.largest 函数返回单个元素 pcollection。关于如何转换格式有什么想法吗?

解决方法

最好的 pcollection 是 []user

所以尝试一下...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)
Salin selepas log masuk

Atas ialah kandungan terperinci Apache Beam pilih baris N teratas daripada PCollection dalam Go. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan