Maison > développement back-end > Golang > Filtre Apache Beam ParDo dans Go

Filtre Apache Beam ParDo dans Go

王林
Libérer: 2024-02-05 11:57:58
avant
1069 Les gens l'ont consulté

Go 中的 Apache Beam ParDo 过滤器

Contenu de la question

Je suis un développeur Python mais je devrais utiliser go pour créer un pipeline de flux de données. Je ne trouve pas autant d'exemples Apache Beam utilisant Go par rapport à Python ou Java.

J'ai le code suivant qui a une structure de nom d'utilisateur et d'âge. La tâche consiste à ajouter l'âge, puis à filtrer en fonction de l'âge. J'ai trouvé un moyen d'augmenter l'âge mais je suis bloqué sur la partie filtrage.

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.registerfunction(incrementage)
}

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func incrementage(list user) user {
    list.age++
    return list
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 40},
        {"adam", 50},
        {"john", 35},
        {"ben", 8},
    }
    initial := beam.createlist(s, userlist)

    pc := beam.pardo(s, incrementage, initial)

    pc1 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, pc)

    beam.pardo0(s, printrow, pc1)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}
Copier après la connexion

J'ai essayé de créer une fonction comme ci-dessous, mais cela renvoie une valeur booléenne au lieu de l'objet utilisateur. Je sais qu'il me manque quelque chose de simple mais je n'arrive pas à le comprendre.

func filterage(list user) user {
    return list.age > 40    
}
Copier après la connexion

En python, je peux écrire une fonction comme ci-dessous.

beam.Filter(lambda line: line["Age"] >= 40))
Copier après la connexion


Bonne réponse


Vous devez ajouter un émetteur dans la fonction pour lancer l'utilisateur :

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}
Copier après la connexion

Comme écrit dans votre code actuel, 返回 list.age > 40 list.age > 40 Évalue d'abord vrai (un booléen) et renvoie ce booléen.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:stackoverflow.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal