Maison > Article > développement back-end > Comment mettre en œuvre une analyse boursière en temps réel à l'aide de PHP et Kafka
Avec le développement d'Internet et de la technologie, l'investissement numérique est devenu un sujet de préoccupation croissante pour les gens. De nombreux investisseurs continuent d’explorer et d’étudier des stratégies d’investissement, dans l’espoir d’obtenir un retour sur investissement plus élevé. Dans le domaine du trading d'actions, l'analyse boursière en temps réel est très importante pour la prise de décision, et l'utilisation de la file d'attente de messages en temps réel Kafka et de la technologie PHP constitue un moyen efficace et pratique.
1. Introduction à Kafka
Kafka est un système de messagerie de publication et d'abonnement distribué à haut débit développé par LinkedIn. Les principales fonctionnalités de Kafka sont des données en temps réel élevées, une vitesse de traitement rapide et la prise en charge des groupes d'abonnés aux messages pour réaliser la multidiffusion de messages. Les principaux composants de Kafka sont le courtier, le producteur et le consommateur.
2. Introduction à PHP
PHP est un langage de script largement utilisé dans le développement d'applications Web côté serveur. PHP présente les caractéristiques d'une syntaxe simple, d'une vitesse d'exécution rapide, d'une facilité d'apprentissage et d'utilisation, etc. C'est l'un des langages de programmation couramment utilisés dans le développement d'applications Web.
3. Comment utiliser Kafka et PHP pour réaliser une analyse boursière en temps réel
<?php $conf = new RdKafkaConf(); $rk = new RdKafkaProducer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("kafka-broker1:9092,kafka-broker2:9092"); $topic = $rk->newTopic("stock-market"); // 生产一条数据 $messagePayload = '{"time": "2021-01-01 10:00:00", "symbol": "AAPL", "price": 125.67}'; $topic->produce(RD_KAFKA_PARTITION_UA, 0, $messagePayload); $rk->flush(1000); ?>Dans le code ci-dessus, nous créons d'abord une instance de producteur Kafka et utilisons la méthode addBrokers( ) précise l'adresse de Kafka Broker. Ensuite, nous avons créé un objet de sujet Kafka et utilisé la méthode Produce() pour écrire un élément de données au format JSON dans ce sujet. Enfin, la persistance du message est assurée en appelant la méthode flush().
<?php $conf = new RdKafkaConf(); $rk = new RdKafkaConsumer($conf); $rk->addBrokers("kafka-broker1:9092,kafka-broker2:9092"); $topicConf = new RdKafkaTopicConf(); $topicConf->set("auto.commit.interval.ms", 100); $topicConf->set("offset.store.method", "broker"); $topicConf->set("auto.offset.reset", "smallest"); $topic = $rk->newTopic("stock-market", $topicConf); // 消费数据 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $msg = $topic->consume(0, 1000); switch ($msg->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "Received message: " . $msg->payload . " (" . $msg->len . " bytes) "; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more "; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out "; break; default: echo "Error: " . $msg->errstr . " "; break; } } ?>Dans le code ci-dessus, nous créons d'abord une instance de consommateur et utilisons addBrokers() La méthode précise l'adresse de Kafka Broker. Ensuite, nous créons un objet de rubrique Kafka et utilisons la méthode consumeStart() pour démarrer la consommation. Enfin, consommez les données JSON de cette rubrique en appelant la méthode consume().
<?php //读取配置文件数据信息,并连接 Redis $redisConfig = require(__DIR__ . "/config/redis.php"); $client = new PredisClient([ "scheme" => "tcp", "host" => $redisConfig["host"], "port" => $redisConfig["port"] ]); //设置消费者 $conf = new RdKafkaConf(); $rkConsumer = new RdKafkaConsumer($conf); $rkConsumer->addBrokers($kafkaBrokerAddress); $topicConsumerConf = new RdKafkaTopicConf(); $topicConsumerConf->set("auto.commit.interval.ms", 100); $topicConsumerConf->set("offset.store.method", "broker"); $topicConsumerConf->set("auto.offset.reset", "earliest"); $topic = $rkConsumer->newTopic($kafkaTopic, $topicConsumerConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); //标记数据是否重复 $lastProcessedMessage = array(); while (true) { $msg = $topic->consume(0, 1000); if (empty($msg)) { // 无消息 continue; } if ($msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) { $msgJson = json_decode($msg->payload, true); if (in_array($msgJson, $lastProcessedMessage)) { // 重复消息 continue; } //写入redis中库存信息 $redisKey = sprintf("%s:%s", "stock-market", $msgJson["symbol"]); $client->zadd($redisKey, time(), $msg->payload); $lastProcessedMessage[] = $msgJson; } }Dans l'exemple de code ci-dessus, nous utilisons l'API Consumer de Kafka pour consommer les données au format JSON dans le sujet, puis utilisons Redis pour le stockage et le tri des données. La méthode de stockage consiste à utiliser le type de données de l'ensemble trié, à utiliser le code boursier comme clé, à utiliser l'horodatage comme valeur et à utiliser la méthode zadd() pour écrire les informations boursières dans Redis. Après avoir collecté et stocké les données boursières, vous pouvez utiliser des bibliothèques de graphiques telles que Chart.js pour afficher les données sur l'interface utilisateur afin de permettre aux utilisateurs d'effectuer une analyse boursière en temps réel. 4. Résumé Cet article présente comment utiliser Kafka et PHP pour mettre en œuvre une analyse de stock en temps réel, et montre la création de producteurs et de consommateurs à travers des exemples de code, et comment utiliser Redis pour traiter et stocker les données boursières en temps réel. Sur cette base, nous avons également exploré comment utiliser les bibliothèques de graphiques pour visualiser les données boursières. Il s'agit d'une technologie très pratique qui peut être utilisée pour obtenir et analyser rapidement des données boursières afin de prendre des décisions d'investissement meilleures et plus favorables.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!