Comment répliquer la numérotation des lignes SQL dans les RDD Spark
Comprendre le problème
Vous voulez pour générer un numéro de ligne séquentiel pour chaque entrée dans un Spark RDD, classé par colonnes spécifiques et partitionné par une colonne clé. Semblable à row_number() over (partition by ... order by ...), mais en utilisant Spark RDD.
Votre tentative initiale
Votre tentative initiale utilisée sortByKey et zipWithIndex, qui n'ont pas produit les numéros de lignes partitionnés souhaités. Notez que sortBy n'est pas applicable directement aux RDD, vous obligeant à les collecter d'abord, ce qui entraîne une sortie non RDD.
Solution utilisant Spark 1.4
Préparation des données
Créer un RDD avec des tuples de la forme (K, (col1, col2, col3)).
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) val temp1 = sc.parallelize(sample_data)
Génération de numéros de ligne partitionnés
Utilisez rowNumber sur une fenêtre partitionnée pour générer des numéros de ligne pour chaque clé :
import org.apache.spark.sql.functions._ temp1.toDF("key", "col1", "col2", "col3").withColumn("rownum", rowNumber() over (Window partitionBy "key" orderBy desc("col2"), "col3")))
Exemple de sortie
+---+----+----+----+------+ |key|col1|col2|col3|rownum| +---+----+----+----+------+ |1,2|1 |4 |7 |2 | |1,2|1 |2 |3 |1 | |1,2|2 |2 |3 |3 | |3,4|5 |5 |5 |1 | |3,4|5 |5 |9 |2 | |3,4|7 |5 |5 |3 | +---+----+----+----+------+
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!