Ces dernières années, la demande de traitement de données en temps réel n'a cessé de croître. Les technologies de démarrage à froid et par lots ne peuvent plus répondre aux besoins de traitement des données en temps réel. Par conséquent, de plus en plus d’entreprises se tournent vers la technologie de traitement des données en temps réel. Cet article explique comment utiliser PHP et Kafka pour réaliser un traitement de données en temps réel.
Kafka est une plateforme de traitement de flux distribué à haut débit développée à l'origine par LinkedIn. Kafka peut être utilisé pour créer de nouveaux traitements de flux, traitements par lots, systèmes de messagerie, systèmes de coordination, etc.
PHP est un langage de programmation dynamique populaire largement utilisé pour créer des applications Internet. Bien que PHP ne soit pas le premier choix pour le traitement des données en temps réel, il est largement utilisé dans le développement Web et le traitement des données.
Nous allons maintenant présenter les étapes à suivre pour mettre en œuvre le traitement des données en temps réel à l'aide de PHP et Kafka.
Étape 1 : Installer et configurer PHP
Avant de démarrer le traitement des données en temps réel de PHP, nous devons installer l'environnement PHP et ajouter les extensions PHP nécessaires, telles que les extensions Kafka et Extension Redis.
L'extension Kafka peut être téléchargée et installée à partir de ce lien kafka, pecl install kafka install l'extension kafka.
Extension Redis Vous pouvez télécharger et installer l'extension PHP Redis à partir d'ici, ou vous pouvez utiliser PECL pour l'installer, commande : pecl install redis.
Après avoir installé et configuré l'extension PHP, nous pouvons commencer à écrire des programmes de traitement de données en temps réel.
Étape 2 : Connectez-vous à Kafka
Utilisez les producteurs Kafka et les consommateurs Kafka pour connecter les flux de données dans Kafka afin de transférer les données vers le « pipeline de données ». En PHP, nous pouvons utiliser les classes KafkaProducer et KafkaConsumer fournies par Kafka et les instancier pour nous connecter à Kafka.
L'exemple de code est le suivant :
set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息 $kafkaProducer = new RdKafkaProducer($kafkaConf); $kafkaConsumer = new RdKafkaConsumer($kafkaConf); $topic = $kafkaProducer->newTopic('sample'); ?>
Étape 3 : Lecture des données
Nous pouvons utiliser la classe KafkaConsumer pour obtenir des flux de données en temps réel . Dans Kafka, il existe le concept de flux, qui divise le flux de données en une ou plusieurs partitions, chaque partition étant constituée d'une partition maître et de zéro ou plusieurs partitions esclaves. En PHP, on peut utiliser la classe KafkaConsumer pour instancier un objet consommateur et s'abonner à une ou plusieurs partitions pour lire des données.
L'exemple de code est le suivant :
set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息 $kafkaConsumer = new RdKafkaConsumer($kafkaConf); $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $kafkaConsumer->newTopic('sample', $topicConf); var_dump($topic->getMetadata(true, 10000)); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 1000); if (null !== $message) { print_r($message->payload); } } ?>
Étape 4 : Traitement des données
Après avoir reçu les données, nous pouvons traiter les données et les stocker en mémoire. Nous pouvons utiliser Redis pour stocker des données et les conserver en toute sécurité en actualisant régulièrement les données dans la base de données au moment approprié.
L'exemple de code est le suivant :
set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息 $kafkaConsumer = new RdKafkaConsumer($kafkaConf); $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $kafkaConsumer->newTopic('sample', $topicConf); $redisClient = new Redis(); $redisClient->connect('127.0.0.1', 6379); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 1000); if (null !== $message) { $data = json_decode($message->payload); $redisClient->hMSet('my_data', [ $data->key1 => $data->value1, $data->key2 => $data->value2, ]); } } ?>
Étape 5 : Synchronisation des données
Enfin, nous devons vider le flux de données en temps réel à notre base de données. Nous pouvons utiliser une minuterie et un processus PHP pour vider périodiquement le cache Redis dans la base de données.
L'exemple de code est le suivant :
set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息 $kafkaConsumer = new RdKafkaConsumer($kafkaConf); $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $kafkaConsumer->newTopic('sample', $topicConf); $redisClient = new Redis(); $redisClient->connect('127.0.0.1', 6379); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); $count = 0; while (true) { $message = $topic->consume(0, 1000); if (null !== $message) { $data = json_decode($message->payload); $redisClient->hMSet('my_data', [ $data->key1 => $data->value1, $data->key2 => $data->value2, ]); $count++; if ($count == 5) { $count = 0; $allData = $redisClient->hGetAll('my_data'); //将数据更新到数据库中 //... } } } ?>
Conclusion
Dans cet article, nous avons présenté comment implémenter le traitement des données en temps réel à l'aide de PHP et Kafka. Utilisez Kafka pour diffuser facilement des données en temps réel dans votre pipeline de données et utilisez PHP pour traiter et stocker les données. Nous utilisons également Redis comme cache et stockage en mémoire pour gérer les données en temps réel. Cette approche peut facilement remplacer les solutions de mise en cache et de messagerie tout en offrant des performances et une évolutivité supérieures.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!