ETL (抽出、変換、ロード) プロセスは、特にリアルタイム データに基づいた迅速な意思決定が必要なアプリケーションにおいて、データを効率的に管理するための基礎です。この記事では、Binance API からのリアルタイムの暗号通貨取引を含む実際的な例を使用して ETL プロセスについて説明します。提供されている Python コードは、取引データを抽出し、使用可能な形式に変換し、SQLite データベースにロードし、リアルタイム プロットでデータを視覚化する方法を示しています。
サンプル ETL プロジェクト: https://github.com/vcse59/FeatureEngineering/tree/main/Real-Time-CryptoCurrency-Price-Tracker
1.抽出
ETL プロセスの最初のステップは抽出です。これには、さまざまなソースからデータを収集することが含まれます。この場合、データは WebSocket 接続を介して Binance Testnet API に抽出されます。この接続により、BTC/USDT 取引のリアルタイム ストリーミングが可能になります。
コードでの抽出の実装方法は次のとおりです。
with websockets.connect(url) as ws: response = await ws.recv() trade_data = json.loads(response)
受信した各メッセージには、価格、数量、タイムスタンプなどの重要な取引データが含まれており、JSON として形式化されています。
2.変換
データが抽出されると、変換プロセスが実行されます。このステップでは、データをクリーンにして構造化して、より有用なものにします。この例では、変換にはタイムスタンプをミリ秒から読み取り可能な形式に変換し、さらなる処理のためにデータを適切なタイプに編成することが含まれます。
price = float(trade_data['p']) quantity = float(trade_data['q']) timestamp = int(trade_data['T']) trade_time = datetime.fromtimestamp(timestamp / 1000.0)
これにより、価格と数量が浮動小数点として保存され、操作と分析が容易になるようにタイムスタンプが日時オブジェクトに変換されます。
3.ロード
最後のステップは読み込みで、変換されたデータがターゲット データベースに保存されます。私たちのコードでは、SQLite データベースが取引データの記憶媒体として機能します。
読み込みプロセスは次の関数によって管理されます:
def save_trade_to_db(price, quantity, timestamp): conn = sqlite3.connect('trades.db') cursor = conn.cursor() # Create a table if it doesn't exist cursor.execute(''' CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, price REAL, quantity REAL, timestamp TEXT ) ''') # Insert the trade data cursor.execute(''' INSERT INTO trades (price, quantity, timestamp) VALUES (?, ?, ?) ''', (price, quantity, trade_time)) conn.commit() conn.close()
この関数は SQLite データベースに接続し、テーブルが存在しない場合はテーブルを作成し、取引データを挿入します。
4.視覚化
データを保存するだけでなく、より良い理解と意思決定のためにデータを視覚化することが不可欠です。提供されたコードには、リアルタイムで取引をプロットする関数が含まれています:
def plot_trades(): if len(trades) > 0: timestamps, prices, quantities = zip(*trades) plt.subplot(2, 1, 1) plt.cla() # Clear the previous plot for real-time updates plt.plot(timestamps, prices, label='Price', color='blue') plt.ylabel('Price (USDT)') plt.legend() plt.title('Real-Time BTC/USDT Prices') plt.xticks(rotation=45) plt.subplot(2, 1, 2) plt.cla() # Clear the previous plot for real-time updates plt.plot(timestamps, quantities, label='Quantity', color='orange') plt.ylabel('Quantity') plt.xlabel('Time') plt.legend() plt.xticks(rotation=45) plt.tight_layout() # Adjust layout for better spacing plt.pause(0.1) # Pause to update the plot
この関数は 2 つのサブプロットを生成します。1 つは価格、もう 1 つは数量です。 matplotlib ライブラリを使用してデータを動的に視覚化し、ユーザーがリアルタイムで市場動向を観察できるようにします。
結論
この例では ETL プロセスに焦点を当て、WebSocket API からデータを抽出し、分析のために変換し、データベースにロードし、即時のフィードバックのために視覚化する方法を示します。このフレームワークは、取引プラットフォームや市場分析ツールなど、リアルタイム データに基づいて情報に基づいた意思決定を行う必要があるアプリケーションを構築するために不可欠です。
以上がリアルタイム データを使用した ETL プロセスの理解: 抽出、変換、読み込み、視覚化の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。