Maison > développement back-end > Golang > Apache Beam sélectionne les N premières lignes de PCollection dans Go

Apache Beam sélectionne les N premières lignes de PCollection dans Go

PHPz
Libérer: 2024-02-10 17:48:08
avant
573 Les gens l'ont consulté

Apache Beam 从 Go 中的 PCollection 中选择前 N 行

Apache Beam est un framework de traitement de données distribué open source qui fournit un modèle de programmation unifié pouvant s'exécuter sur différents moteurs de traitement par lots et par flux. Récemment, une fonctionnalité très utile a été ajoutée au SDK Go d'Apache Beam : la sélection des N premières lignes d'une PCollection. Cette fonctionnalité est très utile pour les scénarios dans lesquels de grands ensembles de données doivent être échantillonnés ou prévisualisés rapidement. Dans cet article, nous expliquerons comment utiliser cette fonctionnalité dans le SDK Go d'Apache Beam et montrerons des exemples de code pratiques. commençons!

Contenu de la question

J'ai une pcollection dans laquelle je dois sélectionner les n plus grandes lignes. J'essaie de créer un pipeline de flux de données en utilisant Go et je suis bloqué.

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 5},
        {"adam", 8},
        {"john", 3},
        {"ben", 1},
        {"jose", 1},
        {"bryan", 1},
        {"kim", 1},
        {"tim", 1},
    }
    initial := beam.createlist(s, userlist)

    pc2 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, initial)

    beam.pardo0(s, printrow, pc2)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}
Copier après la connexion

À partir du code ci-dessus, je dois sélectionner les 5 premières lignes en fonction de user.age J'ai trouvé le lien en haut du package qui a la même fonctionnalité mais il indique qu'il renvoie un seul élément pcollection. Quelle est la différence?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}
Copier après la connexion

J'ai ajouté la fonction pour sélectionner les 5 premières lignes comme ci-dessus mais j'ai eu une erreur []main.user is not allocate to main.user

J'ai besoin de la collection dans le même format qu'avant car je dois la traiter davantage. Je soupçonne que c'est parce que la fonction top.largest renvoie un seul élément pcollection. Des idées sur la façon de convertir le format ?

Solution

La meilleure collection est []user

Alors essayez-le...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:stackoverflow.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal