首页 后端开发 php教程 教你如何快速进行php+kafka的安装

教你如何快速进行php+kafka的安装

Jun 30, 2021 am 09:39 AM
kafka php

我们学习了解了这么多关于PHP的知识,今天教你们如何快速进行php+kafka的安装,如果不会的“童鞋”,那就跟随本篇文章一起继续学习吧

1、 安装java,并设置相关的环境变量

> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz
> mv java-se-7u75-ri/ /opt/
> export JAVA_HOME=/opt/java-se-7u75-ri
> export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
> export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar

#验证安装
> java -verison
openjdk version "1.7.0_75"
OpenJDK Runtime Environment (build 1.7.0_75-b13)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)

2、安装kafka,这里以0.10.2版本为例

> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
> tar zxvf kafka_2.11-0.10.2.0.tgz
> mv kafka_2.11-0.10.2.0/ /opt/kafka
> cd /opt/kafka

#启动zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

#启动kafka
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

#尝试创建一个topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

#生产者写入消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

#消费者消费消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

3、安装kafka的C操作库

> wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz
> tar zxvf v1.3.0.tar.gz
> cd librdkafka-1.3.0
> ./configure
> make && make install

4、安装php的kafka扩展 ,这里选择php-rdkafka扩展 https://github.com/arnaud-lb/php-rdkafka

> wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz
> tar 4.0.2.tar.gz
> cd php-rdkafka-4.0.2
> /opt/php7/bin/phpize
> ./configure --with-php-config=/opt/php7/bin/php-config
> make && make install

修改php.ini,加入 extension=rdkafka.so

5、安装rdkafka的IDE代码提示文件

> composer create-project kwn/php-rdkafka-stubs php-rdkafka-stubs

以phpstrom为例,在你的项目的External Libraries右键,选择Configure PHP Include Paths,把刚刚的路径添加进来。

6、编写php测试代码

producer:

<?php
$conf = new RdKafka\Conf();
$conf->set(&#39;log_level&#39;, LOG_ERR);
$conf->set(&#39;debug&#39;, &#39;admin&#39;);
$conf->set(&#39;metadata.broker.list&#39;, &#39;localhost:9092&#39;);

//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set(&#39;enable.idempotence&#39;, &#39;true&#39;);

$producer = new RdKafka\Producer($conf);

$topic = $producer->newTopic("test2");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException(&#39;Was unable to flush, messages might be lost!&#39;);
}

low-level consumer:

<?php

$conf = new RdKafka\Conf();
$conf->set(&#39;log_level&#39;, LOG_ERR);
$conf->set(&#39;debug&#39;, &#39;admin&#39;);

// Set the group id. This is required when storing offsets on the broker
$conf->set(&#39;group.id&#39;, &#39;myConsumerGroup&#39;);

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set(&#39;auto.commit.interval.ms&#39;, 100);

// Set the offset store method to &#39;file&#39;
$topicConf->set(&#39;offset.store.method&#39;, &#39;broker&#39;);

// Alternatively, set the offset store method to &#39;none&#39;
// $topicConf->set(&#39;offset.store.method&#39;, &#39;none&#39;);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// &#39;smallest&#39;: start from the beginning
$topicConf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);

$topic = $rk->newTopic("test2", $topicConf);

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            print_r($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

high-level consumer:

<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set(&#39;group.id&#39;, &#39;myConsumerGroup&#39;);

// Initial list of Kafka brokers
$conf->set(&#39;metadata.broker.list&#39;, &#39;127.0.0.1&#39;);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// &#39;smallest&#39;: start from the beginning
$conf->set(&#39;auto.offset.reset&#39;, &#39;smallest&#39;);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic &#39;test2&#39;
$consumer->subscribe([&#39;test2&#39;]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            print_r($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
    sleep(2);
}

 推荐学习:《PHP视频教程

以上是教你如何快速进行php+kafka的安装的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

在PHP中评论代码 在PHP中评论代码 Jul 18, 2025 am 04:57 AM

PHP注释代码常用方法有三种:1.单行注释用//或#屏蔽一行代码,推荐使用//;2.多行注释用/.../包裹代码块,不可嵌套但可跨行;3.组合技巧注释如用/if(){}/控制逻辑块,或配合编辑器快捷键提升效率,使用时需注意闭合符号和避免嵌套。

撰写PHP评论的提示 撰写PHP评论的提示 Jul 18, 2025 am 04:51 AM

写好PHP注释的关键在于明确目的与规范,注释应解释“为什么”而非“做了什么”,避免冗余或过于简单。1.使用统一格式,如docblock(/*/)用于类、方法说明,提升可读性与工具兼容性;2.强调逻辑背后的原因,如说明为何需手动输出JS跳转;3.在复杂代码前添加总览性说明,分步骤描述流程,帮助理解整体思路;4.合理使用TODO和FIXME标记待办事项与问题,便于后续追踪与协作。好的注释能降低沟通成本,提升代码维护效率。

快速PHP安装教程 快速PHP安装教程 Jul 18, 2025 am 04:52 AM

ToinstallPHPquickly,useXAMPPonWindowsorHomebrewonmacOS.1.OnWindows,downloadandinstallXAMPP,selectcomponents,startApache,andplacefilesinhtdocs.2.Alternatively,manuallyinstallPHPfromphp.netandsetupaserverlikeApache.3.OnmacOS,installHomebrew,thenrun'bre

学习PHP:初学者指南 学习PHP:初学者指南 Jul 18, 2025 am 04:54 AM

易于效率,启动启动tingupalocalserverenverenvirestoolslikexamppandacodeeditorlikevscode.1)installxamppforapache,mysql,andphp.2)uscodeeditorforsyntaxssupport.3)

通过评论提高可读性 通过评论提高可读性 Jul 18, 2025 am 04:46 AM

写好注释的关键在于说明“为什么”而非仅“做了什么”,提升代码可读性。1.注释应解释逻辑原因,例如值选择或处理方式背后的考量;2.对复杂逻辑使用段落式注释,概括函数或算法的整体思路;3.定期维护注释确保与代码一致,避免误导,必要时删除过时内容;4.在审查代码时同步检查注释,并通过文档记录公共逻辑以减少代码注释负担。

编写有效的PHP评论 编写有效的PHP评论 Jul 18, 2025 am 04:44 AM

注释不能马虎是因为它要解释代码存在的原因而非功能,例如兼容老接口或第三方限制,否则看代码的人只能靠猜。必须加注释的地方包括复杂的条件判断、特殊的错误处理逻辑、临时绕过的限制。写注释更实用的方法是根据场景选择单行注释或块注释,函数、类、文件开头用文档块注释说明参数与返回值,并保持注释更新,对复杂逻辑可在前面加一行概括整体意图,同时不要用注释封存代码而应使用版本控制工具。

掌握PHP块评论 掌握PHP块评论 Jul 18, 2025 am 04:35 AM

PHPblockcommentsareusefulforwritingmulti-lineexplanations,temporarilydisablingcode,andgeneratingdocumentation.Theyshouldnotbenestedorleftunclosed.BlockcommentshelpindocumentingfunctionswithPHPDoc,whichtoolslikePhpStormuseforauto-completionanderrorche

PHP开发环境设置 PHP开发环境设置 Jul 18, 2025 am 04:55 AM

第一步选择集成环境包XAMPP或MAMP搭建本地服务器;第二步根据项目需求选择合适的PHP版本并配置多版本切换;第三步选用VSCode或PhpStorm作为编辑器并搭配Xdebug进行调试;此外还需安装Composer、PHP_CodeSniffer、PHPUnit等工具辅助开发。

See all articles