Dengan pembangunan berterusan aplikasi Internet moden, semakin banyak aplikasi perlu mengendalikan jumlah komunikasi data yang besar. Cara tradisional mengendalikan komunikasi data ini adalah dengan menggunakan undian atau menyekat I/O, tetapi kaedah ini tidak lagi dapat memenuhi keperluan aplikasi moden kerana ia sangat tidak cekap. Bagi menyelesaikan masalah ini, industri telah membangunkan teknologi yang dipanggil baris gilir mesej dan sistem pengedaran.
Dalam sistem baris gilir dan pengedaran mesej, pengeluar mesej menghantar mesej ke baris gilir, manakala pengguna mesej memperoleh mesej daripada baris gilir dan melakukan operasi yang sepadan. Pendekatan ini boleh meningkatkan kecekapan komunikasi data kerana ia boleh mengelakkan masalah seperti pengundian dan menyekat I/O.
Dalam artikel ini, kita akan membincangkan cara untuk mencapai baris gilir dan pengedaran mesej yang cekap menggunakan PHP dan penyepaduan Apache Kafka.
Pengenalan kepada Apache Kafka
Apache Kafka ialah sistem pemesejan teragih berkemampuan tinggi, kependaman rendah, berskala. Ia boleh mengendalikan jumlah mesej yang besar dan skala secara mendatar untuk menampung beban yang lebih tinggi. Komponen utama Apache Kafka termasuk:
PHP disepadukan dengan Apache Kafka
Untuk menggunakan Apache Kafka, kita perlu menggunakan sambungan Kafka untuk PHP. Sambungan ini menyediakan semua API yang diperlukan PHP untuk mengendalikan Kafka.
Pertama, kita perlu memasang sambungan Kafka, yang boleh kita pasang dari PECL:
pecl install kafka
Selepas memasang sambungan, anda boleh mula menggunakannya. Berikut ialah contoh ringkas pengeluaran dan penggunaan mesej menggunakan PHP dan Apache Kafka:
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka生产者 $producer = new RdKafkaProducer(); $producer->setLogLevel(LOG_DEBUG); $producer->addBrokers($brokers); // 创建一个Kafka消费者 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaConsumer($conf); $consumer->addBrokers($brokers); // 生产消息 $topicProducer = $producer->newTopic($topic); for ($i = 0; $i < 10; $i++) { $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, 'Message ' . $i); } // 消费消息 $topicConsumer = $consumer->newTopic($topic); $topicConsumer->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { $message = $topicConsumer->consume(0, 1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo $message->payload . PHP_EOL; }
Dalam contoh ini, kami mula-mula mencipta pengeluar Kafka dan pengguna Kafka. Kemudian, dalam pengeluar, kami menghantar 10 mesej ke topik yang ditentukan; dalam pengguna, kami menggunakan mesej dari topik yang ditentukan dan mengeluarkan kandungannya.
Pada ketika ini, kami telah berjaya melaksanakan pengeluaran dan penggunaan mesej ringkas menggunakan PHP dan Apache Kafka. Seterusnya, kami akan membincangkan cara melaksanakan fungsi yang lebih maju menggunakan PHP dan Apache Kafka.
Contoh aplikasi lanjutan
Dalam aplikasi sebenar, kami biasanya perlu melaksanakan beberapa fungsi lanjutan, seperti:
Di sini kita akan membincangkan bagaimana untuk melaksanakan fungsi ini.
Pengedaran Mesej
Dalam aplikasi praktikal, kita biasanya perlu mengawal aliran mesej Sebagai contoh, kita mungkin mahu hanya pengguna tertentu menggunakan mesej tertentu. Untuk mencapai kefungsian ini, kita boleh membuat baris gilir untuk setiap pengguna dan kemudian menetapkan mesej khusus kepada baris gilir tertentu.
Berikut adalah contoh yang menggunakan dua pengguna untuk menggunakan dua tugas berbeza.
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者组 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaKafkaConsumer($conf); $consumer->subscribe([$topic]); // 创建两个Kafka生产者,一个生产者用于向消费者1发送消息,另一个生产者用于向消费者2发送消息 $producer1 = new RdKafkaProducer(); $producer1->addBrokers($brokers); $producer1Topic = $producer1->newTopic($topic . '_1'); $producer2 = new RdKafkaProducer(); $producer2->addBrokers($brokers); $producer2Topic = $producer2->newTopic($topic . '_2'); // 消费消息 while (true) { $message = $consumer->consume(1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; // 根据消息内容分配给不同的生产者 if ($message->payload === 'task1') { $producer1Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload); } elseif ($message->payload === 'task2') { $producer2Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload); } }
Dalam contoh ini, kami menggunakan dua pengeluar untuk mengedarkan mesej kepada dua pengguna berbeza. Apabila pengguna menerima mesej, kami boleh menetapkannya kepada pengeluar tertentu berdasarkan kandungan mesej. Kaedah ini boleh membantu kami mengawal aliran mesej dan mengelakkan pemprosesan mesej yang berlebihan.
Kumpulan Pengguna
Dalam pengguna Kafka biasa, pengguna yang berbeza dalam kumpulan yang sama menggunakan topik yang sama dan mereka akan menerima mesej yang sama. Ini kerana Kafka secara automatik mengimbangi partition dan memastikan setiap partition diproses oleh hanya seorang pengguna.
Dalam PHP, kita boleh menggunakan group.id untuk mengumpulkan pengguna untuk merealisasikan fungsi kumpulan pengguna.
Berikut ialah contoh kumpulan pengguna Kafka yang boleh memproses mesej dalam kumpulan yang sama secara selari:
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者组 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $conf->set('metadata.broker.list', $brokers); $conf->set('enable.auto.commit', 'false'); $consumer = new RdKafkaKafkaConsumer($conf); // 添加需要订阅的topic $consumer->subscribe([$topic]); // 处理消息 while (true) { $message = $consumer->consume(1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; // 处理完消息后手动提交offset $consumer->commit(); }
Dalam contoh ini, kami mencipta kumpulan pengguna Kafka dan menambah topik yang perlu dilanggan kepadanya. Kami kemudiannya boleh memproses mesej dalam kumpulan yang sama secara selari.
Nota: Dalam kumpulan pengguna, berbilang pengguna menggunakan satu atau lebih partition bersama-sama Apabila menggunakan data, anda perlu memberi perhatian kepada isu pemprosesan berbilang benang bagi data yang sama.
Konfigurasi offset
Dalam Kafka, setiap partition mempunyai offset bebas. Pengguna boleh mengawal di mana dalam partition ia membaca dan dengan itu mesej yang dibacanya. Pengguna boleh mula membaca dari mesej terakhir atau mesej terkini.
Dalam PHP, kita boleh menggunakan offset untuk mengawal kedudukan bacaan mesej. Berikut ialah contoh konfigurasi Offset:
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaKafkaConsumer($conf); // 订阅topic $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'earliest'); $topic = $consumer->newTopic($topic, $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); // 消费消息 while (true) { $message = $topic->consume(0, 1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; }
Dalam contoh ini, kami menggunakan auto.offset.reset untuk menetapkan konfigurasi offset. Konfigurasi ini memberitahu pengguna untuk mula menggunakan mesej dari offset terawal.
Dalam aplikasi praktikal, ofset berbeza boleh dikonfigurasikan mengikut keperluan. Sebagai contoh, selepas pengeluar gagal memproses beberapa mesej, kami mungkin perlu memulakan semula membaca mesej dari titik di mana mesej gagal diproses sebelum ini.
Kesimpulan
Dalam artikel ini, kami membincangkan cara mencapai baris gilir dan pengedaran mesej yang cekap menggunakan PHP dan penyepaduan Apache Kafka. Kami mula-mula memperkenalkan asas Apache Kafka, dan kemudian membincangkan cara melaksanakan pengeluaran dan penggunaan mesej menggunakan sambungan Kafka untuk PHP. Akhir sekali, kami membincangkan cara melaksanakan beberapa ciri lanjutan seperti pengedaran mesej, kumpulan pengguna dan konfigurasi mengimbangi.
Menggunakan penyepaduan PHP dan Apache Kafka membolehkan kami melaksanakan baris gilir dan pengedaran mesej yang cekap, dengan itu meningkatkan kelajuan tindak balas aplikasi dan pemprosesan. Jika anda sedang membangunkan aplikasi yang perlu mengendalikan jumlah komunikasi data yang besar, Apache Kafka dan sambungan Kafka untuk PHP mungkin merupakan pilihan yang baik.
Atas ialah kandungan terperinci Penyepaduan PHP dan Apache Kafka untuk baris gilir dan pengedaran mesej yang cekap. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!