情報爆発時代の到来により、データの使用と処理はますます重要になっています。ストリーミング データ処理は、大量のデータを処理する重要な方法の 1 つになっています。 PHP 開発者は、リアルタイム データの処理に関する経験とニーズを持っている必要があります。この記事では、PHP と Google Cloud Dataflow を使用してストリーミング データの処理と管理を行う方法を紹介します。
1. Google Cloud Dataflow の概要
Google Cloud Dataflow は、大規模なデータ処理タスクを管理するクラウド サービスです。大規模なデータ フローを効率的に処理できると同時に、バッチとストリームも可能です。処理が混在しています。
Google Cloud Dataflow には次の特徴があります:
Google Cloud SDK のインストール
$ gcloud config set project [PROJECT_ID]
gRPC 拡張機能
$ pecl install grpc
Protobuf 拡張機能
$ pecl install protobuf
Dataflow PHP 拡張機能
$ pecl install google-cloud-dataflow-alpha
<?php require __DIR__ . '/vendor/autoload.php'; use GoogleCloudBigQueryBigQueryClient; use GoogleCloudDataflowDataflowClient; use GoogleCloudDataflowPubSubPubSubOptions; use GoogleCloudPubSubPubSubClient; use GoogleCloudDataflowOptions; $configs = include __DIR__ . '/config.php'; $inputTopic = $configs['input_topic']; $outputTable = $configs['output_table']; $project = $configs['project_id']; $bucket = $configs['bucket']; $stagingLocation = $configs['staging_location']; $tempLocation = $configs['temp_location']; $jobName = 'test-job'; $options = [ 'project' => $project, 'stagingLocation' => $stagingLocation, 'tempLocation' => $tempLocation, 'jobName' => $jobName, ]; $pubsub = new PubSubClient([ 'projectId' => $project ]); $pubsub_topic = $pubsub->topic($inputTopic); $bigquery = new BigQueryClient([ 'projectId' => $project ]); $dataset = $bigquery->dataset('test_dataset'); $table = $dataset->table($outputTable); $table->create([ 'schema' => [ [ 'name' => 'id', 'type' => 'STRING', ], [ 'name' => 'timestamp', 'type' => 'TIMESTAMP', ], [ 'name' => 'message', 'type' => 'STRING', ], ], ]); $dataflow = new DataflowClient(); $pubsubOptions = PubSubOptions::fromArray([ 'topic' => sprintf('projects/%s/topics/%s', $project, $inputTopic), ]); $options = [ Options::PROJECT => $project, Options::STAGING_LOCATION => $stagingLocation, Options::TEMP_LOCATION => $tempLocation, Options::JOB_NAME => $jobName, ]; $job = $dataflow->createJob([ 'projectId' => $project, 'name' => $jobName, 'environment' => [ 'tempLocation' => sprintf('gs://%s/temp', $bucket), ], 'steps' => [ [ 'name' => 'Read messages from Pub/Sub', 'pubsubio' => (new GoogleCloudDataflowIoPubsubPubsubMessage()) ->expand($pubsubOptions) ->withAttributes(false) ->withIdAttribute('unique_id') ->withTimestampAttribute('publish_time') ], [ 'name' => 'Write messages to BigQuery', 'bigquery' => (new GoogleCloudDataflowIoBigQueryBigQueryWrite()) ->withJsonSchema(file_get_contents(__DIR__ . '/schema.json')) ->withTable($table->tableId()) ], ] ]); $operation = $job->run(); # Poll the operation until it is complete $operation->pollUntilComplete(); if (!$operation->isComplete()) { exit(1); } if ($operation->getError()) { print_r($operation->getError()); exit(1); } echo "Job has been launched";
$ php dataflow.php
以上がPHP と Google Cloud Dataflow を使用してストリーミング データの処理と管理を行う方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。