실제 기계 학습을 위해 분산 컴퓨팅이 필요한 이유

WBOY
풀어 주다: 2024-09-10 06:49:32
원래의
765명이 탐색했습니다.

Why You Need Distributed Computing for Real-World Machine Learning

그리고 PySpark가 전문가처럼 대용량 데이터 세트를 처리하는 데 어떻게 도움이 되는지

PyTorch 및 TensorFlow와 같은 머신러닝 프레임워크는 모델 구축에 매우 적합합니다. 그러나 현실은 거대한 데이터 세트를 다루는 실제 프로젝트의 경우 좋은 모델 그 이상이 필요하다는 것입니다. 모든 데이터를 효율적으로 처리하고 관리할 수 있는 방법이 필요합니다. PySpark와 같은 분산 컴퓨팅이 문제를 해결하기 위해 등장하는 곳입니다.

실제 머신러닝에서 빅데이터를 처리하는 것이 PyTorch와 TensorFlow를 넘어서는 이유와 PySpark가 이를 달성하는 데 어떻게 도움이 되는지 자세히 살펴보겠습니다.
진짜 문제: 빅데이터
온라인에서 볼 수 있는 대부분의 ML 예제는 작고 관리 가능한 데이터 세트를 사용합니다. 모든 것을 메모리에 넣고, 가지고 놀고, 몇 분 만에 모델을 훈련할 수 있습니다. 그러나 신용 카드 사기 감지, 추천 시스템, 재무 예측과 같은 실제 시나리오에서는 수백만 또는 수십억 개의 행을 처리하게 됩니다. 갑자기 노트북이나 서버가 처리할 수 없게 되었습니다.

모든 데이터를 PyTorch 또는 TensorFlow에 한 번에 로드하려고 하면 문제가 발생합니다. 이러한 프레임워크는 대규모 데이터 세트를 효율적으로 처리하기 위한 것이 아니라 모델 교육용으로 설계되었습니다. 이것이 바로 분산 컴퓨팅이 중요한 부분입니다.
PyTorch와 TensorFlow만으로는 충분하지 않은 이유
PyTorch와 TensorFlow는 모델 구축 및 최적화에 적합하지만 대규모 데이터 작업을 처리할 때는 부족합니다. 두 가지 주요 문제:

  • 메모리 과부하: 학습 전에 전체 데이터 세트를 메모리에 로드합니다. 소규모 데이터 세트에는 효과가 있지만 테라바이트 규모의 데이터가 있으면 게임이 끝납니다.
  • 분산 데이터 처리 없음: PyTorch와 TensorFlow는 분산 데이터 처리를 처리하도록 구축되지 않았습니다. 여러 시스템에 걸쳐 방대한 양의 데이터가 분산되어 있다면 실제로는 도움이 되지 않습니다.

여기서 PySpark가 빛을 발합니다. 분산된 데이터를 사용하여 여러 시스템에서 효율적으로 처리하는 동시에 시스템 충돌 없이 대규모 데이터 세트를 처리하도록 설계되었습니다.

실제 사례: PySpark를 이용한 신용카드 사기 탐지
예를 들어 보겠습니다. 신용 카드 거래 데이터를 사용하는 사기 탐지 시스템을 작업하고 있다고 가정해 보겠습니다. 이 경우 Kaggle의 인기 있는 데이터 세트를 사용하겠습니다. 여기에는 284,000건이 넘는 거래가 포함되어 있으며 그 중 사기 거래는 1% 미만입니다.

1단계: Google Colab에서 PySpark 설정
최소한의 설정으로 PySpark를 실행할 수 있는 Google Colab을 사용하겠습니다.

!pip install pyspark

로그인 후 복사

다음으로 필요한 라이브러리를 가져오고 Spark 세션을 시작하세요.

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, udf
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
import numpy as np
from pyspark.sql.types import FloatType
로그인 후 복사

pyspark 세션 시작

spark = SparkSession.builder \
    .appName("FraudDetectionImproved") \
    .master("local[*]") \
    .config("spark.executorEnv.PYTHONHASHSEED", "0") \
    .getOrCreate()
로그인 후 복사

2단계: 데이터 로드 및 준비

data = spark.read.csv('creditcard.csv', header=True, inferSchema=True)
data = data.orderBy("Time")  # Ensure data is sorted by time
data.show(5)
data.describe().show()
로그인 후 복사
# Check for missing values in each column
data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()

# Prepare the feature columns
feature_columns = data.columns
feature_columns.remove("Class")  # Removing "Class" column as it is our label

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)
data.select("features", "Class").show(5)

# Split data into train (60%), test (20%), and unseen (20%)
train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42)
test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42)

# Print class distribution in each dataset
print("Train Data:")
train_data.groupBy("Class").count().show()

print("Test and parameter optimisation Data:")
test_data.groupBy("Class").count().show()

print("Unseen Data:")
unseen_data.groupBy("Class").count().show()
로그인 후 복사

3단계: 모델 초기화

# Initialize RandomForestClassifier
rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability")

# Create ParamGrid for Cross Validation
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20 ]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Create 5-fold CrossValidator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"),
                          numFolds=5)
로그인 후 복사

4단계: 적합, 교차 검증 실행, 가장 적합한 매개변수 세트 선택

# Run cross-validation, and choose the best set of parameters
rf_model = crossval.fit(train_data)

# Make predictions on test data
predictions_rf = rf_model.transform(test_data)

# Evaluate Random Forest Model
binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

auc_rf = binary_evaluator.evaluate(predictions_rf)
auprc_rf = pr_evaluator.evaluate(predictions_rf)
print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}")

# UDF to extract positive probability from probability vector
extract_prob = udf(lambda prob: float(prob[1]), FloatType())
predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
로그인 후 복사

5단계 정밀도, 재현율, F1 점수 계산 기능

# Function to calculate precision, recall, and F1-score
def calculate_metrics(predictions):
    tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count()
    fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count()
    fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count()

    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    return precision, recall, f1_score
로그인 후 복사

6단계: 모델에 가장 적합한 임계값 찾기

# Find the best threshold for the model
best_threshold = 0.5
best_f1 = 0
for threshold in np.arange(0.1, 0.9, 0.1):
    thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double"))
    precision, recall, f1 = calculate_metrics(thresholded_predictions)

    if f1 > best_f1:
        best_f1 = f1
        best_threshold = threshold

print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
로그인 후 복사

7단계: 보이지 않는 데이터 평가

# Evaluate on unseen data
predictions_unseen = rf_model.transform(unseen_data)
auc_unseen = binary_evaluator.evaluate(predictions_unseen)
print(f"Unseen Data - AUC: {auc_unseen:.4f}")

precision, recall, f1 = calculate_metrics(predictions_unseen)
print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")

area_under_roc = binary_evaluator.evaluate(predictions_unseen)
area_under_pr = pr_evaluator.evaluate(predictions_unseen)
print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
로그인 후 복사

결과

Best threshold: 0.30000000000000004, Best F1-score: 0.9062
Unseen Data - AUC: 0.9384
Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485
Unseen Data - AUC: 0.9423, AUPRC: 0.8618
로그인 후 복사

이 모델을 저장하고(몇 KB) pyspark 파이프라인 어디에서나 사용할 수 있습니다

rf_model.save()
로그인 후 복사

실제 기계 학습 작업에서 대규모 데이터 세트를 처리할 때 PySpark가 큰 차이를 만드는 이유는 다음과 같습니다.

손쉬운 확장: PySpark는 클러스터 전체에 작업을 분산하여 메모리 부족 없이 테라바이트 규모의 데이터를 처리할 수 있습니다.
즉각적인 데이터 처리: PySpark는 전체 데이터 세트를 메모리에 로드할 필요가 없습니다. 필요에 따라 데이터를 처리하므로 훨씬 더 효율적입니다.
더 빠른 모델 교육: 분산 컴퓨팅을 사용하면 여러 시스템에 컴퓨팅 워크로드를 분산하여 모델을 더 빠르게 교육할 수 있습니다.
최종 생각
PyTorch와 TensorFlow는 머신러닝 모델을 구축하기 위한 환상적인 도구이지만 실제 대규모 작업에는 더 많은 것이 필요합니다. PySpark를 사용한 분산 컴퓨팅을 사용하면 대규모 데이터 세트를 효율적으로 처리하고 실시간으로 데이터를 처리하며 기계 학습 파이프라인을 확장할 수 있습니다.

따라서 다음에 사기 탐지, 추천 시스템, 재무 분석 등 대규모 데이터를 사용하여 작업할 때 PySpark를 사용하여 프로젝트를 한 단계 더 발전시키는 것을 고려해 보세요.

전체 코드와 결과를 보려면 이 노트를 확인하세요. :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23

__

저는 Swapnil입니다. 결과와 아이디어에 대한 의견을 남겨주시거나 데이터, 소프트웨어 개발 작업 및 채용 정보에 대해 swapnil@nooffice.no로 저에게 핑을 보내주세요

위 내용은 실제 기계 학습을 위해 분산 컴퓨팅이 필요한 이유의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:dev.to
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿