Detailed explanation of PHP implementation of producers and consumers (Kafka application)

藏色散人
Release: 2023-04-09 20:26:02
forward
4767 people have browsed it

This article introduces PHP to producers and consumers, I hope to help friends in need!

Preface

Using Kafka in PHP requires the RdKafka extension, and RdKafka depends on librdkafka, so we need to install both of them. The specific installation method is from Baidu. This article No further explanation.

Producer (test)

The steps required to create a consumer:

  • Producer configuration parameters
  • Create production Producer instance
  • Create topic instance (depends on producer)
  • Produce topic message
  • Push message

The specific code is as follows:

$conf = new \RdKafka\Conf(); // 绑定服务节点 $conf->set('metadata.broker.list', '127.0.0.1:32772'); // 创建生产者 $kafka = new \RdKafka\Producer($conf); // 创建主题实例 $topic = $kafka->newTopic('p1r1'); // 生产主题数据,此时消息在缓冲区中,并没有真正被推送 $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message'); // 阻塞时间(毫秒), 0为非阻塞 $kafka->poll(0); // 推送消息,如果不调用此函数,消息不会被发送且会丢失 $result = $kafka->flush(5000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); }
Copy after login

Consumer

Creating a consumer requires several steps:

  • Consumer configuration parameters
  • Apply configuration parameters to create a consumer User instance
  • Subscribe to the corresponding topic
  • Pull data
  • Submit displacement

The specific code is as follows:

$conf = new \RdKafka\Conf(); // 绑定消费者组 $conf->set('group.id', 'ceshi'); // 绑定服务节点,多个用,分隔 $conf->set('metadata.broker.list', '127.0.0.1:32787'); // 设置自动提交为false $conf->set('enable.auto.commit', 'false'); // 设置当前消费者拉取数据时的偏移量, 可选参数: // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。 // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。 $conf->set('auto.offset.reset', 'earliest'); // 创建消费者实例 $consumer = new \RdKafka\KafkaConsumer($conf); // 消费者订阅主题,数组形式 $consumer->subscribe(['topic1','topic2']); while (true) { // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环) $message = $consumer->consume(5000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: // 业务逻辑 var_dump($message); // 提交位移 $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } // 关闭消费者(一般用在脚本中,不需要关闭) $conumser->close();
Copy after login

Consumption only Data in the specified partition:

// 对消费者指定分区,注意此方式不能与subscribe一同使用 $consumer->assign([ new RdKafka\TopicPartition("topic", 0), new RdKafka\TopicPartition("topic", 1), ]);
Copy after login

The above is the detailed content of Detailed explanation of PHP implementation of producers and consumers (Kafka application). For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:learnku.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!