Dalam tutorial ini, kami akan meneroka gabungan Python dan PySpark yang berkuasa untuk memproses set data yang besar. PySpark ialah perpustakaan Python yang menyediakan antara muka kepada Apache Spark, sistem pengkomputeran kluster yang pantas dan serba boleh. Dengan memanfaatkan PySpark, kami boleh mengedarkan dan memproses data dengan cekap merentas set mesin, membolehkan kami mengendalikan set data berskala besar dengan mudah.
Dalam artikel ini, kami akan menyelidiki asas PySpark dan menunjukkan cara melaksanakan pelbagai tugas pemprosesan data pada set data yang besar. Kami akan merangkumi konsep utama seperti RDD (Resilient Distributed Datasets) dan bingkai data, dan menunjukkan aplikasi praktikalnya dengan contoh langkah demi langkah. Dengan mempelajari tutorial ini, anda akan mempunyai pemahaman yang kukuh tentang cara menggunakan PySpark dengan berkesan untuk memproses dan menganalisis set data berskala besar.
Dalam bahagian ini, kami akan menyediakan persekitaran pembangunan dan membiasakan diri dengan konsep asas PySpark. Kami akan membincangkan cara memasang PySpark, memulakan SparkSession dan memuatkan data ke dalam RDD dan DataFrames. Mari mulakan memasang PySpark:
# Install PySpark !pip install pyspark
Collecting pyspark ... Successfully installed pyspark-3.1.2
Selepas memasang PySpark, kami boleh memulakan SparkSession untuk menyambung ke gugusan Spark kami:
from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()
Dengan SparkSession kami sedia, kami kini boleh memuatkan data ke dalam RDD atau DataFrames. RDD ialah struktur data asas dalam PySpark, yang menyediakan koleksi elemen yang diedarkan. DataFrames, sebaliknya, menyusun data ke dalam lajur yang dinamakan, serupa dengan jadual dalam pangkalan data hubungan. Mari muatkan fail CSV sebagai DataFrame:
# Load a CSV file as a DataFrame df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)
+---+------+--------+ |id |name |age | +---+------+--------+ |1 |John |32 | |2 |Alice |28 | |3 |Bob |35 | +---+------+--------+
Seperti yang anda boleh lihat daripada coretan kod di atas, kami menggunakan kaedah `read.csv()` untuk membaca fail CSV ke dalam bingkai data. Parameter `header=True` bermaksud bahawa baris pertama mengandungi nama lajur dan `inferSchema=True` akan membuat kesimpulan secara automatik jenis data setiap lajur.
Dalam bahagian ini, kami akan meneroka pelbagai transformasi data dan teknik analisis menggunakan PySpark. Kami akan merangkumi operasi seperti penapisan, pengagregatan dan penggabungan set data. Mari mulakan dengan menapis data berdasarkan kriteria tertentu:
# Filter data filtered_data = df.filter(df["age"] > 30)
+---+----+---+ |id |name|age| +---+----+---+ |1 |John|32 | |3 |Bob |35 | +---+----+---+
Dalam coretan kod di atas, kami menggunakan kaedah `filter()` untuk memilih baris dengan lajur "umur" lebih daripada 30. Operasi ini membolehkan kami mengekstrak subset yang berkaitan daripada set data yang besar.
Seterusnya, mari lakukan pengagregatan pada set data menggunakan kaedah "groupBy()" dan "agg()":
# Aggregate data aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})
+------+-----------+--------+ |gender|avg(salary)|max(age)| +------+-----------+--------+ |Male |2500 |32 | |Female|3000 |35 | +------+-----------+--------+
Di sini, kami mengumpulkan data mengikut lajur "Jantina" dan mengira purata gaji dan umur maksimum bagi setiap kumpulan. Bingkai data "data_agregat" yang terhasil memberikan kami cerapan berharga tentang set data.
Selain penapisan dan pengagregatan, PySpark juga membolehkan kami menyertai berbilang set data dengan cekap. Mari kita pertimbangkan contoh di mana kita mempunyai dua DataFrames: "df1" dan "df2". Kita boleh menyertai mereka berdasarkan lajur biasa:
# Join two DataFrames joined_data = df1.join(df2, on="id", how="inner")
+---+----+---------+------+ |id |name|department|salary| +---+----+---------+------+ |1 |John|HR |2500 | |2 |Alice|IT |3000 | |3 |Bob |Sales |2000 | +---+----+---------+------+
Kaedah `join()` membolehkan kami menggabungkan DataFrames berdasarkan lajur biasa yang ditentukan oleh parameter `on`. Bergantung pada keperluan kita, kita boleh memilih jenis sambungan yang berbeza, seperti "dalaman", "luar", "kiri" atau "kanan".
Dalam bahagian ini, kami akan meneroka teknologi PySpark termaju untuk meningkatkan lagi keupayaan pemprosesan data kami. Kami akan membincangkan topik seperti fungsi takrif pengguna (UDF), fungsi tetingkap dan caching. Mari kita mulakan dengan mentakrif dan menggunakan UDF:
from pyspark.sql.functions import udf # Define a UDF def square(x): return x ** 2 # Register the UDF square_udf = udf(square) # Apply the UDF to a column df = df.withColumn("age_squared", square_udf(df["age"]))
+---+------+---+------------+ |id |name |age|age_squared | +---+------+---+------------+ |1 |John |32 |1024 | |2 |Alice |28 |784 | |3 |Bob |35 |1225 | +---+------+---+------------+
Dalam coretan kod di atas, kami telah mentakrifkan fungsi UDF ringkas bernama `square()` yang digunakan untuk mengduakan input yang diberikan. Kami kemudiannya mendaftarkan UDF ini menggunakan fungsi `udf()` dan menggunakannya pada lajur "umur", mencipta lajur baharu yang dipanggil "age_squared" dalam DataFrame kami.
PySpark juga menyediakan fungsi tetingkap berkuasa yang membolehkan kami melakukan pengiraan dalam julat tetingkap tertentu. Mari kita pertimbangkan baris sebelumnya dan seterusnya untuk mengira purata gaji setiap pekerja:
from pyspark.sql.window import Window from pyspark.sql.functions import lag, lead, avg # Define the window window = Window.orderBy("id") # Calculate average salary with lag and lead df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)
+---+----+---------+------+----------+ |id |name|department|salary|avg_salary| +---+----+---------+------+----------+ |1 |John|HR |2500 |2666.6667 | |2 |Alice| IT |3000 |2833.3333 | |3 |Bob |Sales |2000 |2500 | +---+----+---------+------+----------+
Dalam petikan kod di atas, kami menggunakan kaedah "Window.orderBy()" untuk menentukan tetingkap yang menentukan susunan baris berdasarkan lajur "id". Kami kemudian menggunakan fungsi "lag()" dan "lead()" untuk mengakses baris sebelumnya dan seterusnya masing-masing. Akhirnya, kami mengira purata gaji dengan mempertimbangkan barisan semasa dan jirannya.
Akhir sekali, caching ialah teknologi penting dalam PySpark untuk meningkatkan prestasi algoritma lelaran atau pengiraan berulang. Kita boleh cache DataFrame atau RDD dalam ingatan menggunakan kaedah `cache()`:
# Cache a DataFrame df.cache()
Caching tidak akan menunjukkan sebarang output, tetapi operasi seterusnya bergantung pada DataFrame yang dicache akan menjadi lebih pantas kerana data disimpan dalam memori.
Dalam tutorial ini, kami meneroka kuasa PySpark untuk memproses set data yang besar dalam Python. Kami mula-mula menyediakan persekitaran pembangunan dan memuatkan data ke dalam RDD dan DataFrames. Kami kemudian menyelidiki transformasi data dan teknik analisis, termasuk penapisan, pengagregatan dan penggabungan set data. Akhir sekali, kami membincangkan teknik PySpark lanjutan seperti fungsi yang ditentukan pengguna, fungsi tetingkap dan caching.
Atas ialah kandungan terperinci Proses set data yang besar dengan Python PySpark. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!