Saya sedang menulis pengguna Kafka untuk projek API menggunakan perpustakaan php-rdkafka. Saya perlu mencari offset terakhir dalam topik dan mendapatkan nilai daripadanya untuk pemprosesan selanjutnya. Sebagai contoh, offset terakhir dalam topik = 5, maka saya perlu mendapatkan offset 5 dan menghantarnya melalui API sehingga offset baharu ditambahkan. Apa yang saya cuba jalankan:
$conf = new RdKafka\Conf(); $settings = [ 'socket.keepalive.enable' => true, 'log_level' => LOG_WARNING, 'enable.auto.offset.store' => 'true', 'auto.offset.reset' => 'earliest', 'enable.partition.eof' => 'false', 'enable.auto.commit' => 'false', 'max.poll.interval.ms' => 300000, 'session.timeout.ms' => 45000, 'group.id' => 'test-group', 'group.instance.id' => uniqid('', true), 'metadata.broker.list' => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092', ]; foreach ($settings as $key => $value) { $conf->set($key, $value); } $topicName = 'userstatistics_12345'; $partition = 0; $topicPartition = new RdKafka\TopicPartition($topicName, $partition); $topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]); var_dump($topicPartitionsWithOffsets);
Tetapi ini mengembalikan hasil yang pelik dengan offset negatif
array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }
Walaupun sebenarnya offset terakhir pada masa ini ialah 59. Idea saya adalah untuk mendapatkan offset terakhir dan kemudian mendapatkan nilai menggunakan:
$consumer->assign([ new RdKafka\TopicPartition($topicName, $partition, $lastOffset) ]);
Saya juga tidak mahu menggunakan gelung while(true) untuk melakukan kerja skrip dengan cepat.
Itu sahaja. terima kasih.
Saya menemui jawapannya dan ia sangat berkesan untuk saya: