Golang Kafka 10으로 소비자 그룹 오프셋 가져오기
전통적으로 Kafka를 사용하여 Golang의 소비자 그룹 기능을 관리하기 위해 외부 라이브러리를 사용했습니다. 그러나 Kafka 10은 이제 기본적으로 이러한 기능을 제공합니다. 이는 Golang Kafka 라이브러리(sarama)를 사용하여 소비자 그룹이 처리한 현재 메시지 오프셋을 어떻게 검색할 수 있는지에 대한 질문을 제기합니다.
이전에는 kazoo-go를 사용하여 Zookeeper에서 그룹 메시지 오프셋을 검색했습니다. sarama-cluster가 도입되면서 대체 접근 방식이 필요합니다.
솔루션
다음 코드 조각은 소비자 그룹 오프셋을 얻는 방법을 보여줍니다.
<code class="go">package main import ( "context" "log" "strings" "github.com/Shopify/sarama" ) func main() { groupName := "testgrp" topic := "topic_name" offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic) if err != nil { log.Fatal(err) } log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset) } type gcInfo struct { offset int64 } func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil } func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false // we don't want to change consumer group offsets client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config) if err != nil { return 0, err } info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err } return info.offset, nil }</code>
위 내용은 Sarama를 사용하여 Golang Kafka 10에서 소비자 그룹의 현재 메시지 오프셋을 검색하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!