So replizieren Sie die Zeilennummerierung von SQL in Spark-RDDs
Das Problem verstehen
Sie wollen um eine fortlaufende Zeilennummer für jeden Eintrag in einem Spark-RDD zu generieren, geordnet nach bestimmten Spalten und partitioniert nach einer Schlüsselspalte. Ähnlich wie row_number() von SQL über (partition by ... order by ...), aber unter Verwendung von Spark-RDDs.
Ihr erster Versuch
Ihr erster Versuch verwendet sortByKey und zipWithIndex, die nicht die gewünschten partitionierten Zeilennummern erzeugten. Beachten Sie, dass sortBy nicht direkt auf RDDs anwendbar ist, sondern dass Sie diese zuerst sammeln müssen, was zu einer Nicht-RDD-Ausgabe führt.
Lösung mit Spark 1.4
Datenvorbereitung
Erstellen Sie ein RDD mit Tupeln der Form (K, (col1, col2, col3)).
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) val temp1 = sc.parallelize(sample_data)
Partitionierte Zeilennummern generieren
Verwenden Sie rowNumber über ein partitioniertes Fenster, um Zeilennummern für jeden Schlüssel zu generieren:
import org.apache.spark.sql.functions._ temp1.toDF("key", "col1", "col2", "col3").withColumn("rownum", rowNumber() over (Window partitionBy "key" orderBy desc("col2"), "col3")))
Beispielausgabe
+---+----+----+----+------+ |key|col1|col2|col3|rownum| +---+----+----+----+------+ |1,2|1 |4 |7 |2 | |1,2|1 |2 |3 |1 | |1,2|2 |2 |3 |3 | |3,4|5 |5 |5 |1 | |3,4|5 |5 |9 |2 | |3,4|7 |5 |5 |3 | +---+----+----+----+------+
Das obige ist der detaillierte Inhalt vonWie generiert man sequentielle Zeilennummern in Spark-RDDs, ähnlich wie „row_number()' von SQL?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!