Golang s'est avéré très adapté à la programmation simultanée. Les Goroutines sont plus lisibles, élégantes et efficaces que la programmation asynchrone. Cet article propose un modèle d'exécution de Pipeline adapté à la mise en œuvre par Golang, adapté au traitement par lots de grandes quantités de données (ETL).
Imaginez ce scénario d'application : 1 milliard ; associer les informations utilisateur de la base de données B (MySQL) en fonction de l'ID utilisateur de chaque commentaire ; ) pour traiter chaque commentaire ; écrire les résultats du traitement dans la base de données C (ElasticSearch).
En raison de divers problèmes rencontrés dans l'application, ces exigences ont été résumées :Exigence 1 : Les données doivent être traitées par lots, par exemple 100 éléments par lot. Lorsqu'un problème survient (comme une panne de base de données), il sera interrompu et un point de contrôle sera utilisé pour récupérer de l'interruption au prochain démarrage du programme.
Exigence 2 : Définir un nombre raisonnable de concurrence pour chaque processus, afin que la base de données et le service NLP aient une charge raisonnable (sans affecter les autres activités, occuper autant de ressources que possible pour améliorer les performances ETL). Par exemple, les étapes (1) à (4) définissent les numéros de concurrence sur 1, 4, 8 et 2 respectivement.
Il s'agit d'un modèle d'exécution typique de Pipeline. Considérez chaque lot de données (par exemple, 100 articles) comme un produit sur la chaîne d'assemblage. Les 4 étapes correspondent aux 4 procédures de traitement sur la chaîne d'assemblage. Une fois chaque processus terminé, le produit semi-fini est remis à. le prochain processus. Le nombre de produits pouvant être traités simultanément dans chaque processus varie.
Module Pipeline réutilisable
Afin de terminer le travail ETL plus efficacement, j'ai abstrait Pipeline en modules. Je vais d'abord coller le code, puis analyser la signification. Le module est utilisable directement, et les principales interfaces utilisées sont : NewPipeline, Async et Wait.
Grâce à ce composant Pipeline, notre programme ETL sera simple, efficace et fiable, libérant les programmeurs du contrôle fastidieux des processus simultanés :package main import "log" func main() { //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外执行,最后一个工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加载100条数据,并修改变量checkpoint //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //这里有个Golang著名的坑。 //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。 //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //处理完毕 } err := pipeline.Wait() if err != nil { log.Print(err) } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!