Dans ce tutoriel, nous explorerons la puissante combinaison de Python et PySpark pour traiter de grands ensembles de données. PySpark est une bibliothèque Python qui fournit une interface à Apache Spark, un système informatique en cluster rapide et polyvalent. En tirant parti de PySpark, nous pouvons distribuer et traiter efficacement les données sur un ensemble de machines, ce qui nous permet de gérer facilement des ensembles de données à grande échelle.
Dans cet article, nous approfondirons les principes fondamentaux de PySpark et démontrerons comment effectuer diverses tâches de traitement de données sur de grands ensembles de données. Nous aborderons des concepts clés tels que les RDD (Resilient Distributed Datasets) et les trames de données, et montrerons leur application pratique avec des exemples étape par étape. En étudiant ce didacticiel, vous comprendrez parfaitement comment utiliser efficacement PySpark pour traiter et analyser des ensembles de données à grande échelle.
Dans cette section, nous allons mettre en place l'environnement de développement et nous familiariser avec les concepts de base de PySpark. Nous verrons comment installer PySpark, initialiser une SparkSession et charger des données dans des RDD et des DataFrames. Commençons par installer PySpark :
# Install PySpark !pip install pyspark
Collecting pyspark ... Successfully installed pyspark-3.1.2
Après avoir installé PySpark, nous pouvons initialiser une SparkSession pour nous connecter à notre cluster Spark :
from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()
Avec notre SparkSession prête, nous pouvons désormais charger des données dans des RDD ou des DataFrames. Les RDD constituent la structure de données de base de PySpark, qui fournit une collection distribuée d'éléments. Les DataFrames, quant à eux, organisent les données en colonnes nommées, similaires aux tables des bases de données relationnelles. Chargeons un fichier CSV en tant que DataFrame :
# Load a CSV file as a DataFrame df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)
+---+------+--------+ |id |name |age | +---+------+--------+ |1 |John |32 | |2 |Alice |28 | |3 |Bob |35 | +---+------+--------+
Comme vous pouvez le voir dans l'extrait de code ci-dessus, nous utilisons la méthode `read.csv()` pour lire le fichier CSV dans un bloc de données. Le paramètre `header=True` signifie que la première ligne contient des noms de colonnes, et `inferSchema=True` déduira automatiquement le type de données de chaque colonne.
Dans cette section, nous explorerons diverses techniques de transformation et d'analyse de données à l'aide de PySpark. Nous couvrirons des opérations telles que le filtrage, l'agrégation et la jointure d'ensembles de données. Commençons par filtrer les données selon des critères précis :
# Filter data filtered_data = df.filter(df["age"] > 30)
+---+----+---+ |id |name|age| +---+----+---+ |1 |John|32 | |3 |Bob |35 | +---+----+---+
Dans l'extrait de code ci-dessus, nous utilisons la méthode `filter()` pour sélectionner les lignes où la colonne « âge » est supérieure à 30. Cette opération nous permet d'extraire des sous-ensembles pertinents à partir de grands ensembles de données.
Ensuite, effectuons l'agrégation sur l'ensemble de données en utilisant les méthodes « groupBy() » et « agg() » :
# Aggregate data aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})
+------+-----------+--------+ |gender|avg(salary)|max(age)| +------+-----------+--------+ |Male |2500 |32 | |Female|3000 |35 | +------+-----------+--------+
Ici, nous regroupons les données par colonne « Genre » et calculons le salaire moyen et l'âge maximum pour chaque groupe. La trame de données « aggregated_data » résultante nous fournit des informations précieuses sur l'ensemble de données.
En plus du filtrage et de l'agrégation, PySpark nous permet également de joindre efficacement plusieurs ensembles de données. Prenons un exemple où nous avons deux DataFrames : "df1" et "df2". Nous pouvons les rejoindre sur la base d'une colonne commune :
# Join two DataFrames joined_data = df1.join(df2, on="id", how="inner")
+---+----+---------+------+ |id |name|department|salary| +---+----+---------+------+ |1 |John|HR |2500 | |2 |Alice|IT |3000 | |3 |Bob |Sales |2000 | +---+----+---------+------+
La méthode `join()` nous permet de fusionner des DataFrames en fonction des colonnes communes spécifiées par le paramètre `on`. En fonction de nos besoins, nous pouvons choisir différents types de connexion, tels que « interne », « externe », « gauche » ou « droite ».
Dans cette section, nous explorerons la technologie avancée PySpark pour améliorer davantage nos capacités de traitement de données. Nous aborderons des sujets tels que les fonctions définies par l'utilisateur (UDF), les fonctions de fenêtre et la mise en cache. Commençons par définir et utiliser les UDF :
from pyspark.sql.functions import udf # Define a UDF def square(x): return x ** 2 # Register the UDF square_udf = udf(square) # Apply the UDF to a column df = df.withColumn("age_squared", square_udf(df["age"]))
+---+------+---+------------+ |id |name |age|age_squared | +---+------+---+------------+ |1 |John |32 |1024 | |2 |Alice |28 |784 | |3 |Bob |35 |1225 | +---+------+---+------------+
Dans l'extrait de code ci-dessus, nous avons défini une fonction UDF simple nommée `square()` qui est utilisée pour mettre au carré l'entrée donnée. Nous enregistrons ensuite cet UDF à l'aide de la fonction `udf()` et l'appliquons à la colonne "age", créant une nouvelle colonne appelée "age_squared" dans notre DataFrame.
PySpark fournit également de puissantes fonctions de fenêtre qui nous permettent d'effectuer des calculs dans une plage de fenêtre spécifique. Considérons les lignes précédentes et suivantes pour calculer le salaire moyen par employé :
from pyspark.sql.window import Window from pyspark.sql.functions import lag, lead, avg # Define the window window = Window.orderBy("id") # Calculate average salary with lag and lead df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)
+---+----+---------+------+----------+ |id |name|department|salary|avg_salary| +---+----+---------+------+----------+ |1 |John|HR |2500 |2666.6667 | |2 |Alice| IT |3000 |2833.3333 | |3 |Bob |Sales |2000 |2500 | +---+----+---------+------+----------+
Dans l'extrait de code ci-dessus, nous utilisons la méthode "Window.orderBy()" pour définir une fenêtre qui spécifie l'ordre des lignes en fonction de la colonne "id". Nous utilisons ensuite les fonctions "lag()" et "lead()" pour accéder respectivement à la ligne précédente et suivante. Enfin, nous calculons le salaire moyen en considérant la ligne actuelle et ses voisines.
Enfin, la mise en cache est une technologie importante dans PySpark pour améliorer les performances des algorithmes itératifs ou des calculs répétés. Nous pouvons mettre en cache un DataFrame ou un RDD en mémoire en utilisant la méthode `cache()` :
# Cache a DataFrame df.cache()
La mise en cache n'affichera aucune sortie, mais les opérations ultérieures reposant sur le DataFrame mis en cache seront plus rapides puisque les données sont stockées en mémoire.
Dans ce tutoriel, nous explorons la puissance de PySpark pour traiter de grands ensembles de données en Python. Nous avons d'abord configuré l'environnement de développement et chargé les données dans des RDD et des DataFrames. Nous avons ensuite approfondi les techniques de transformation et d'analyse des données, notamment le filtrage, l'agrégation et la jointure d'ensembles de données. Enfin, nous discutons des techniques avancées de PySpark telles que les fonctions définies par l'utilisateur, les fonctions de fenêtre et la mise en cache.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!