Bagaimana untuk mendapatkan offset terakhir dalam topik dari Kafka menggunakan perpustakaan php?
P粉763662390
P粉763662390 2023-09-10 11:00:45
0
1
476

Saya sedang menulis pengguna Kafka untuk projek API menggunakan perpustakaan php-rdkafka. Saya perlu mencari offset terakhir dalam topik dan mendapatkan nilai daripadanya untuk pemprosesan selanjutnya. Sebagai contoh, offset terakhir dalam topik = 5, maka saya perlu mendapatkan offset 5 dan menghantarnya melalui API sehingga offset baharu ditambahkan. Apa yang saya cuba jalankan:

$conf = new RdKafka\Conf(); $settings = [ 'socket.keepalive.enable' => true, 'log_level' => LOG_WARNING, 'enable.auto.offset.store' => 'true', 'auto.offset.reset' => 'earliest', 'enable.partition.eof' => 'false', 'enable.auto.commit' => 'false', 'max.poll.interval.ms' => 300000, 'session.timeout.ms' => 45000, 'group.id' => 'test-group', 'group.instance.id' => uniqid('', true), 'metadata.broker.list' => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092', ]; foreach ($settings as $key => $value) { $conf->set($key, $value); } $topicName = 'userstatistics_12345'; $partition = 0; $topicPartition = new RdKafka\TopicPartition($topicName, $partition); $topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]); var_dump($topicPartitionsWithOffsets);

Tetapi ini mengembalikan hasil yang pelik dengan offset negatif

array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }

Walaupun sebenarnya offset terakhir pada masa ini ialah 59. Idea saya adalah untuk mendapatkan offset terakhir dan kemudian mendapatkan nilai menggunakan:

$consumer->assign([ new RdKafka\TopicPartition($topicName, $partition, $lastOffset) ]);

Saya juga tidak mahu menggunakan gelung while(true) untuk melakukan kerja skrip dengan cepat.

Itu sahaja. terima kasih.

P粉763662390
P粉763662390

membalas semua (1)
P粉701491897

Saya menemui jawapannya dan ia sangat berkesan untuk saya:

$conf = new RdKafka\Conf(); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'test-group'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', 'kafka-1:9092'); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'earliest': start from the beginning $conf->set('auto.offset.reset', 'latest'); // Emit EOF event when reaching the end of a partition $conf->set('enable.partition.eof', 'true'); $kafkaConsumer = new RdKafka\KafkaConsumer($conf); $topicName = 'topic_name'; $partition = 0; $topicPartition = new RdKafka\TopicPartition($topicName, 0); $timeoutMs = 100000; $low = null; $high = null; $wm = $kafkaConsumer->queryWatermarkOffsets($topicName,$partition,$low,$high,$timeoutMs); $offset = $high - 1; $kafkaConsumer->assign([new RdKafka\TopicPartition($topicName, $partition, $offset)]); $message = $kafkaConsumer->consume(1000); if ($message !== null) { // Process the message $payload = $message->payload; echo "Message at offset $offset: $payload\n"; } $kafkaConsumer->close();
    Muat turun terkini
    Lagi>
    kesan web
    Kod sumber laman web
    Bahan laman web
    Templat hujung hadapan
    Tentang kita Penafian Sitemap
    Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!