教你如何快速进行php+kafka的安装
我们学习了解了这么多关于PHP的知识,今天教你们如何快速进行php+kafka的安装,如果不会的“童鞋”,那就跟随本篇文章一起继续学习吧
1、 安装java,并设置相关的环境变量
> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz > tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz > mv java-se-7u75-ri/ /opt/ > export JAVA_HOME=/opt/java-se-7u75-ri > export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin > export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar #验证安装 > java -verison openjdk version "1.7.0_75" OpenJDK Runtime Environment (build 1.7.0_75-b13) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
2、安装kafka,这里以0.10.2版本为例
> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz > tar zxvf kafka_2.11-0.10.2.0.tgz > mv kafka_2.11-0.10.2.0/ /opt/kafka > cd /opt/kafka #启动zookeeper > bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... #启动kafka > bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ... #尝试创建一个topic > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test > bin/kafka-topics.sh --list --zookeeper localhost:2181 test #生产者写入消息 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message #消费者消费消息 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
3、安装kafka的C操作库
> wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz > tar zxvf v1.3.0.tar.gz > cd librdkafka-1.3.0 > ./configure > make && make install
4、安装php的kafka扩展 ,这里选择php-rdkafka扩展 https://github.com/arnaud-lb/php-rdkafka
> wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz > tar 4.0.2.tar.gz > cd php-rdkafka-4.0.2 > /opt/php7/bin/phpize > ./configure --with-php-config=/opt/php7/bin/php-config > make && make install
修改php.ini,加入 extension=rdkafka.so
5、安装rdkafka的IDE代码提示文件
> composer create-project kwn/php-rdkafka-stubs php-rdkafka-stubs
以phpstrom为例,在你的项目的External Libraries右键,选择Configure PHP Include Paths,把刚刚的路径添加进来。


6、编写php测试代码
producer:
<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_ERR);
$conf->set('debug', 'admin');
$conf->set('metadata.broker.list', 'localhost:9092');
//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set('enable.idempotence', 'true');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test2");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}low-level consumer:
<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_ERR);
$conf->set('debug', 'admin');
// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
// Set the offset store method to 'file'
$topicConf->set('offset.store.method', 'broker');
// Alternatively, set the offset store method to 'none'
// $topicConf->set('offset.store.method', 'none');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("test2", $topicConf);
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
print_r($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}high-level consumer:
<?php
$conf = new RdKafka\Conf();
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.1');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$conf->set('auto.offset.reset', 'smallest');
$consumer = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test2'
$consumer->subscribe(['test2']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
print_r($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
sleep(2);
}推荐学习:《PHP视频教程》
以上是教你如何快速进行php+kafka的安装的详细内容。更多信息请关注PHP中文网其他相关文章!
热AI工具
Undress AI Tool
免费脱衣服图片
Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片
AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。
Clothoff.io
AI脱衣机
Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!
热门文章
热工具
记事本++7.3.1
好用且免费的代码编辑器
SublimeText3汉化版
中文版,非常好用
禅工作室 13.0.1
功能强大的PHP集成开发环境
Dreamweaver CS6
视觉化网页开发工具
SublimeText3 Mac版
神级代码编辑软件(SublimeText3)
热门话题
在PHP中评论代码
Jul 18, 2025 am 04:57 AM
PHP注释代码常用方法有三种:1.单行注释用//或#屏蔽一行代码,推荐使用//;2.多行注释用/.../包裹代码块,不可嵌套但可跨行;3.组合技巧注释如用/if(){}/控制逻辑块,或配合编辑器快捷键提升效率,使用时需注意闭合符号和避免嵌套。
撰写PHP评论的提示
Jul 18, 2025 am 04:51 AM
写好PHP注释的关键在于明确目的与规范,注释应解释“为什么”而非“做了什么”,避免冗余或过于简单。1.使用统一格式,如docblock(/*/)用于类、方法说明,提升可读性与工具兼容性;2.强调逻辑背后的原因,如说明为何需手动输出JS跳转;3.在复杂代码前添加总览性说明,分步骤描述流程,帮助理解整体思路;4.合理使用TODO和FIXME标记待办事项与问题,便于后续追踪与协作。好的注释能降低沟通成本,提升代码维护效率。
快速PHP安装教程
Jul 18, 2025 am 04:52 AM
ToinstallPHPquickly,useXAMPPonWindowsorHomebrewonmacOS.1.OnWindows,downloadandinstallXAMPP,selectcomponents,startApache,andplacefilesinhtdocs.2.Alternatively,manuallyinstallPHPfromphp.netandsetupaserverlikeApache.3.OnmacOS,installHomebrew,thenrun'bre
学习PHP:初学者指南
Jul 18, 2025 am 04:54 AM
易于效率,启动启动tingupalocalserverenverenvirestoolslikexamppandacodeeditorlikevscode.1)installxamppforapache,mysql,andphp.2)uscodeeditorforsyntaxssupport.3)
通过评论提高可读性
Jul 18, 2025 am 04:46 AM
写好注释的关键在于说明“为什么”而非仅“做了什么”,提升代码可读性。1.注释应解释逻辑原因,例如值选择或处理方式背后的考量;2.对复杂逻辑使用段落式注释,概括函数或算法的整体思路;3.定期维护注释确保与代码一致,避免误导,必要时删除过时内容;4.在审查代码时同步检查注释,并通过文档记录公共逻辑以减少代码注释负担。
编写有效的PHP评论
Jul 18, 2025 am 04:44 AM
注释不能马虎是因为它要解释代码存在的原因而非功能,例如兼容老接口或第三方限制,否则看代码的人只能靠猜。必须加注释的地方包括复杂的条件判断、特殊的错误处理逻辑、临时绕过的限制。写注释更实用的方法是根据场景选择单行注释或块注释,函数、类、文件开头用文档块注释说明参数与返回值,并保持注释更新,对复杂逻辑可在前面加一行概括整体意图,同时不要用注释封存代码而应使用版本控制工具。
掌握PHP块评论
Jul 18, 2025 am 04:35 AM
PHPblockcommentsareusefulforwritingmulti-lineexplanations,temporarilydisablingcode,andgeneratingdocumentation.Theyshouldnotbenestedorleftunclosed.BlockcommentshelpindocumentingfunctionswithPHPDoc,whichtoolslikePhpStormuseforauto-completionanderrorche
PHP开发环境设置
Jul 18, 2025 am 04:55 AM
第一步选择集成环境包XAMPP或MAMP搭建本地服务器;第二步根据项目需求选择合适的PHP版本并配置多版本切换;第三步选用VSCode或PhpStorm作为编辑器并搭配Xdebug进行调试;此外还需安装Composer、PHP_CodeSniffer、PHPUnit等工具辅助开发。


