如何使用 php 库从 Kafka 获取主题中的最后一个偏移量?
P粉763662390
P粉763662390 2023-09-10 11:00:45
0
1
403

我正在使用 php-rdkafka 库为 API 项目编写 Kafka 消费者。我需要找到主题中的最后一个偏移量并从中获取值以进行进一步处理。例如,主题中的最后一个偏移量 = 5,那么我需要获取偏移量 5 并通过 API 发送它,直到添加新的偏移量。我正在尝试运行的内容:

$conf = new RdKafka\Conf();

$settings = [
    'socket.keepalive.enable'  => true,
    'log_level'                => LOG_WARNING,
    'enable.auto.offset.store' => 'true',
    'auto.offset.reset'        => 'earliest',
    'enable.partition.eof'     => 'false',
    'enable.auto.commit'       => 'false',
    'max.poll.interval.ms'     => 300000,
    'session.timeout.ms'       => 45000,
    'group.id'                 => 'test-group',
    'group.instance.id'        => uniqid('', true),
    'metadata.broker.list'     => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092',
];
foreach ($settings as $key => $value) {
    $conf->set($key, $value);
}

$topicName = 'userstatistics_12345';
$partition = 0;

$topicPartition = new RdKafka\TopicPartition($topicName, $partition);

$topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]);

var_dump($topicPartitionsWithOffsets);

但这会返回带有负偏移量的奇怪结果

array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }

虽然实际上当前的最后一个偏移量是59。我的想法是获取最后一个偏移量,然后使用以下方法获取值:

$consumer->assign([
    new RdKafka\TopicPartition($topicName, $partition, $lastOffset)
]);

我也不想使用 while(true) 循环来快速执行脚本工作。

仅此而已。谢谢。

P粉763662390
P粉763662390

Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!