If you use it, you must have some output, otherwise you will forget it after a while, so here it is Just record the installation process of trying out Kafka and trying out the PHP extension.
To be honest, if it is used for queues, Redis is more compatible with PHP. It's easy to use, haha, but Redis cannot have multiple consumers. However, Kafka does not officially support PHP, and PHP extensions are written by enthusiasts or users. Let’s start with the installation of Kafka. I take CentOS6.4 as an example, 64-bit.
1. First confirm whether jdk is installed
Use command
[root@localhost ~]# java -<span>version java version </span><span>"</span><span>1.8.0_73</span><span>"</span><span> Java(TM) SE Runtime Environment (build </span><span>1.8</span>.0_73-<span>b02) Java HotSpot(TM) </span><span>64</span>-Bit Server VM (build <span>25.73</span>-b02, mixed mode)
If you have the above information, just install it. Some of the jdk may not be compatible, so install it to the right one. If it is not installed, take a look at the jdk installation method below:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
Go to this address to download the jdk8 version. I downloaded jdk-8u73-linux-x64.tar.gz, and then unzipped it to /usr/local/jdk/.
Then open the /etc/profile file
[root@localhost ~]# vim /etc/profile
Write the following code into the file
export JAVA_HOME=/usr/local/jdk/jdk1.<span>8</span><span>.0_73 export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span>dt.jar export PATH</span>=$JAVA_HOME/bin:$PATH
Finally
[root@localhost ~]# source /etc/profile
The jdk will take effect at this time, you can use java -version to verify.
2. Next install Kafka
1. Download Kafka
Go to http://kafka.apache.org/downloads.html to download the corresponding version. I am using kafka_2.9.1-0.8.2.2.tgz.
2. After downloading, unzip it to your favorite directory
I extracted it to /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. Run default Kafka
Start Zookeeper server
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/zookeeper-server-start.<span>sh</span> config/zookeeper.properties &
Start Kafka server
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-server-start.<span>sh</span> config/server.properties &
Run producer producer
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-producer.<span>sh</span> --broker-list localhost:<span>9092</span> --topic test
Run consumer
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-consumer.<span>sh</span> --zookeeper localhost:<span>2181</span> --topic test --from-beginning
In this way, if you input content on the producer side, the consumer will receive it immediately.
4. When there is a cross-machine producer or consumer connection
You need to configure the host.name of config/server.properties, otherwise the cross-machine connection will not be possible.
3. Kafka-PHP extension
After using it for a while, https://github.com/nmred/kafka-php can be used.
I installed it using composer, the following is an example:
producer.php
<?<span>php </span><span>require</span> 'vendor/autoload.php'<span>; </span><span>while</span> (1<span>) { </span><span>$part</span> = <span>mt_rand</span>(0, 1<span>); </span><span>$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span>); </span><span>//</span><span> get available partitions</span> <span>$partitions</span> = <span>$produce</span>->getAvailablePartitions('topic_name'<span>); </span><span>var_dump</span>(<span>$partitions</span><span>); </span><span>//</span><span> send message</span> <span>$produce</span>->setRequireAck(-1<span>); </span><span>$produce</span>->setMessages('topic_name', 0, <span>array</span>(<span>date</span>('Y-m-d H:i:s'<span>)); </span><span>sleep</span>(3<span>); }</span>
consumer.php
<span>require</span> 'vendor/autoload.php'<span>; </span><span>$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span>); </span><span>$group</span> = 'topic_name'<span>; </span><span>$consumer</span>->setGroup(<span>$group</span><span>); </span><span>$consumer</span>->setFromOffset(<span>true</span><span>); </span><span>$consumer</span>->setTopic('topic_name', 0<span>); </span><span>$consumer</span>->setMaxBytes(102400<span>); </span><span>$result</span> = <span>$consumer</span>-><span>fetch(); </span><span>print_r</span>(<span>$result</span><span>); </span><span>foreach</span> (<span>$result</span> <span>as</span> <span>$topicName</span> => <span>$partition</span><span>) { </span><span>foreach</span> (<span>$partition</span> <span>as</span> <span>$partId</span> => <span>$messageSet</span><span>) { </span><span>var_dump</span>(<span>$partition</span>-><span>getHighOffset()); </span><span>foreach</span> (<span>$messageSet</span> <span>as</span> <span>$message</span><span>) { </span><span>var_dump</span>((<span>string</span>)<span>$message</span><span>); } </span><span>var_dump</span>(<span>$partition</span>-><span>getMessageOffset()); } }</span>