> 백엔드 개발 > PHP 튜토리얼 > Kafka 소개와 PHP 기반 Kafka 설치 및 테스트

Kafka 소개와 PHP 기반 Kafka 설치 및 테스트

不言
풀어 주다: 2023-04-03 12:38:01
원래의
3696명이 탐색했습니다.

이 글의 내용은 Kafka의 소개와 PHP를 기반으로 한 Kafka의 설치 및 테스트에 관한 내용입니다. 내용이 매우 자세하게 설명되어 있으니 도움이 필요한 친구들에게 도움이 되었으면 좋겠습니다.

소개

Kafka는 처리량이 높은 분산 게시-구독 메시징 시스템입니다

꼭 알아야 할 Kafka 역할

Producer: producer.
소비자: 소비자.
주제: 메시지는 주제 카테고리에 기록됩니다. Kafka는 메시지 시드(피드)를 분류하고 각 메시지 유형을 주제라고 합니다.
브로커: 클러스터에서 실행되며 하나 이상의 서비스로 구성될 수 있습니다. 각 서비스를 브로커라고 하며 소비자는 하나 이상의 주제를 구독하고 브로커에서 데이터를 가져와 게시된 정보를 사용할 수 있습니다.

클래식 모델

1. 주제 아래의 파티션은 소비자 수보다 작을 수 없습니다. 즉, 주제 아래의 소비자 수가 파티션보다 클 수 없습니다. 더 크면 유휴 시간이 낭비됩니다. 2. 한 주제 아래의 파티션은 동시에 소비될 수 있습니다. 다른 소비자 그룹의 특정 소비자는
3. 주제 아래의 파티션은 동일한 소비자 그룹의 한 소비자만 소비할 수 있습니다.

Kafka 소개와 PHP 기반 Kafka 설치 및 테스트

공통 매개변수 설명

request.required.acks

Kafka 생산자의 ack에는 세 가지 메커니즘이 있습니다. 생산자를 초기화할 때 producerconfig는 request.required.acks의 다른 값을 구성하여 구현할 수 있습니다.

0: 이는 생산자가 다음(배치) 메시지 전송을 계속하기 위해 동기화가 완료되었다는 브로커의 확인을 기다리지 않음을 의미합니다. 이 옵션은 대기 시간은 가장 짧지만 내구성 보장은 가장 약합니다(리더가 죽는 등 서버 장애가 발생하면 일부 데이터가 손실되지만 생산자는 이를 모르고 브로커는 전송된 정보를 받을 수 없습니다).

1: 리더가 데이터를 성공적으로 수신하고 확인한 후 프로듀서가 다음 메시지를 보내는 것을 의미합니다. 이 옵션은 클라이언트가 서버에서 요청이 성공했는지 확인할 때까지 기다리기 때문에 더 나은 내구성을 제공합니다. (죽은 리더에 기록되었지만 아직 복제되지 않은 메시지만 손실됩니다.)

-1: 이는 팔로어 복사본이 데이터를 수신했음을 확인할 때까지 생산자가 전송을 완료하지 않음을 의미합니다.

이 옵션은 최고의 내구성을 제공하며, 하나 이상의 동기화된 복제본이 살아있는 한 정보가 손실되지 않음을 보장합니다.

세 가지 메커니즘, 순서대로 성능이 감소하고(생산자 처리량 감소) 데이터 견고성이 증가합니다.

auto.offset.reset

1. early: 오프셋을 가장 빠른 오프셋으로 자동 재설정

2.latest: 오프셋을 최신 오프셋으로 자동 재설정(기본값)
3. 이전 오프셋을 찾을 수 없는 경우 소비자 그룹에서는 소비자에게 예외가 발생합니다.
4. 기타 매개변수: 소비자에게 예외(잘못된 매개변수) 발생

kafka 설치 및 간단한 테스트

kafka 설치(설치 필요 없음, 압축 풀기)

# 官方下载地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0
로그인 후 복사

kafka 서버 시작

# 需先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
로그인 후 복사

kafka 클라이언트 테스트 시작

# 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0


# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?

# 启动一个生产者(等待消息) 
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?
로그인 후 복사

설치 Kafka의 PHP 확장

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

vim [php]/php.ini
extension=rdkafka.so
로그인 후 복사

php 코드 연습

producer

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
}


$len = $rk->getOutQLen();
while ($len > 0) {
    $len = $rk->getOutQLen();
    var_dump($len);
    $rk->poll(50);
}
로그인 후 복사

run producer

php producer.php
# output

int(20)
int(20)
int(20)
int(20)
int(0)

# 你可以查看你刚才上面启动的消费者shell应该会输出消息
qkl . 0
qkl . 1
qkl . 2
qkl . 3
qkl . 4
qkl . 5
qkl . 6
qkl . 7
qkl . 8
qkl . 9
qkl . 10
qkl . 11
qkl . 12
qkl . 13
qkl . 14
qkl . 15
qkl . 16
qkl . 17
qkl . 18
qkl . 19
로그인 후 복사

consumer

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

//设置消费组
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
//在interval.ms的时间内自动提交确认、建议不要启动
//$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
// 设置offset的存储为broker
 $topicConf->set('offset.store.method', 'broker');
//$topicConf->set('offset.store.path', __DIR__);

//smallest:简单理解为从头开始消费,其实等价于上面的 earliest
//largest:简单理解为从最新的开始消费,其实等价于上面的 latest
//$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// 参数1消费分区0
// RD_KAFKA_OFFSET_BEGINNING 重头开始消费
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
// RD_KAFKA_OFFSET_END 最后一条消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    //参数1表示消费分区,这里是分区0
    //参数2表示同步阻塞多久
    $message = $topic->consume(0, 12 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}
로그인 후 복사

서버 메타데이터 보기(주제/파티션/브로커)

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$allInfo = $rk->metadata(true, NULL, 60e3);

$topics = $allInfo->getTopics();

echo rd_kafka_offset_tail(100);
echo "--";

echo count($topics);
echo "--";


foreach ($topics as $topic) {

    $topicName = $topic->getTopic();
    if ($topicName == "__consumer_offsets") {
        continue ;
    }

    $partitions = $topic->getPartitions();
    foreach ($partitions as $partition) {
//        $rf = new ReflectionClass(get_class($partition));
//        foreach ($rf->getMethods() as $f) {
//            var_dump($f);
//        }
//        die();
        $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
        echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;
    }
}
로그인 후 복사

관련 권장사항:

kafka 설치 및 사용- PHP 확장, kafkakafka-php 확장

kafka 어셈블리 및 Kafka-PHP 확장 사용

위 내용은 Kafka 소개와 PHP 기반 Kafka 설치 및 테스트의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿