Heim > Backend-Entwicklung > Golang > Wie rufe ich Verbrauchergruppen-Offsets in Golang Kafka 10 mithilfe von „sarama-cluster' ab?

Wie rufe ich Verbrauchergruppen-Offsets in Golang Kafka 10 mithilfe von „sarama-cluster' ab?

Patricia Arquette
Freigeben: 2024-10-26 02:22:27
Original
638 Leute haben es durchsucht

How to Retrieve Consumer Group Offsets in Golang Kafka 10 Using `sarama-cluster`?

So rufen Sie Verbrauchergruppen-Offsets in Golang Kafka 10 ab

Zuvor wurden externe Bibliotheken wie kazoo-go zum Abrufen von Verbrauchergruppennachrichten verwendet in Zookeeper gespeicherte Offsets. Mit der Einführung der Verbrauchergruppenfunktion in der Golang Kafka-Bibliothek (sarama) in Version 10 ist es jedoch möglich, direkt auf diese Offsets zuzugreifen.

Verwendung von sarama-cluster

So erhalten Sie Consumer-Gruppen-Offsets mithilfe der sarama-cluster-Bibliothek:

<code class="go">import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>
Nach dem Login kopieren

Dieser Code erstellt eine neue ConsumerGroup-Instanz und ruft den aktuellen Offset ab, indem er eine Nachricht aus dem angegebenen Thema mithilfe eines leeren Handlers (consumeClaim) konsumiert. Der anfängliche Ausgleich des Anspruchs wird dann in der gcInfo-Struktur erfasst.

Das obige ist der detaillierte Inhalt vonWie rufe ich Verbrauchergruppen-Offsets in Golang Kafka 10 mithilfe von „sarama-cluster' ab?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Artikel des Autors
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage