Mendapatkan Offset Kumpulan Pengguna dengan Golang Kafka 10
Secara tradisinya, perpustakaan luar digunakan untuk mengurus keupayaan kumpulan pengguna di Golang dengan Kafka. Walau bagaimanapun, Kafka 10 kini secara asli menyediakan fungsi sedemikian. Ini menimbulkan persoalan: bagaimanakah kita boleh mendapatkan semula offset mesej semasa yang diproses oleh kumpulan pengguna menggunakan perpustakaan Golang Kafka (sarama)?
Sebelum ini, kazoo-go telah digunakan untuk mendapatkan semula offset mesej kumpulan daripada Zookeeper. Dengan pengenalan kluster sarama, pendekatan alternatif diperlukan.
Penyelesaian
Coretan kod berikut menunjukkan cara mendapatkan offset kumpulan pengguna:
<code class="go">package main import ( "context" "log" "strings" "github.com/Shopify/sarama" ) func main() { groupName := "testgrp" topic := "topic_name" offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic) if err != nil { log.Fatal(err) } log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset) } type gcInfo struct { offset int64 } func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil } func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false // we don't want to change consumer group offsets client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config) if err != nil { return 0, err } info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err } return info.offset, nil }</code>
Atas ialah kandungan terperinci Bagaimana untuk Mengambil Offset Mesej Semasa Kumpulan Pengguna di Golang Kafka 10 Menggunakan Sarama?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!