Heim > Backend-Entwicklung > Golang > Wie man Golang und Kafka zusammen verwendet

Wie man Golang und Kafka zusammen verwendet

PHPz
Freigeben: 2023-04-13 18:51:03
Original
1397 Leute haben es durchsucht

Kafka ist eine verteilte Open-Source-Nachrichtenwarteschlange, die häufig zum Erstellen von Echtzeit-Datenstromverarbeitungsanwendungen in Big-Data-Anwendungen verwendet wird. Golang ist eine von Google entwickelte Programmiersprache und bekannt für ihre effiziente Parallelität, leistungsstarken Bibliotheken und ihr Ökosystem. Wie kann man Golang also mit Kafka kombinieren?

Zuerst müssen wir das Paket github.com/Shopify/sarama importieren. Dies ist eine Golang-Clientbibliothek, die Kafka unterstützt. Während des Installationsprozesses müssen Sie den folgenden Befehl ausführen:

go get github.com/Shopify/sarama
Nach dem Login kopieren

Als nächstes müssen wir einen Produzenten erstellen. Erstellen Sie zunächst die Konfiguration:

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
Nach dem Login kopieren

Hier stellen wir den Produzenten so ein, dass er auf alle ACKs wartet, bis zu 5 Wiederholungsversuche durchführt und nach Erfolg eine Erfolgsmeldung an den Produzenten zurücksendet.

Als nächstes müssen wir eine Produzenteninstanz erstellen:

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer producer.Close()
Nach dem Login kopieren

Wir müssen eine Kafka-Brokeradresse als Dienstendpunkt angeben, um eine Verbindung zu Kafka herzustellen. Hier stellen wir eine Verbindung zum lokalen Kafka-Server her. Wir rufen auch die Methode .Close() auf, um sicherzustellen, dass der Produzent beim Beenden bereinigt. .Close()方法,以确保生产者退出时会清理。

现在我们已经准备好了开始向Kafka主题发布消息:

msg := &sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.StringEncoder("Hello World!"),
}

part, offset, err := producer.SendMessage(msg)
if err != nil {
    fmt.Printf("Error publishing message: %v", err)
} else {
    fmt.Printf("Message published successfully. Partition: %v, Offset: %v\n", part, offset)
}
Nach dem Login kopieren

在这个例子中,我们发布了一个消息到名为“test”的主题中。如果没有错误,它会打印出成功发布的分区和偏移量。

现在我们已经创建了一个生产者,向Kafka发布了一条消息。接下来,我们来看一下如何创建一个消费者。

首先,我们需要创建消费者配置:

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
Nach dem Login kopieren

此处我们设定了接收错误。

接下来,我们需要创建一个消费者实例:

consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer consumer.Close()
Nach dem Login kopieren

这里我们同样指定了一个Kafka broker地址。我们还需要调用.Close()方法来确保消费者退出时会清理。

现在我们已经准备好读取Kafka主题的消息:

partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
    panic(err)
}
defer partitionConsumer.Close()

for {
    select {
    case msg := <-partitionConsumer.Messages():
        fmt.Printf("Received message from partition %d with offset %d: %s = %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
    case err := <-partitionConsumer.Errors():
        fmt.Println("Error: ", err.Error())
    }
}
Nach dem Login kopieren

在这个例子中,我们订阅了名为“test”的主题。然后我们读取第一个分区的偏移量。我们然后在一个循环中无限读取来自该分区的消息。循环中的select

Jetzt können wir mit der Veröffentlichung von Nachrichten zum Kafka-Thema beginnen:

rrreee

In diesem Beispiel veröffentlichen wir eine Nachricht zum Thema „test“. Wenn keine Fehler vorliegen, werden die erfolgreich veröffentlichte Partition und der Offset ausgegeben. 🎜🎜Jetzt haben wir einen Produzenten geschaffen, der eine Botschaft an Kafka veröffentlicht. Schauen wir uns als Nächstes an, wie man einen Verbraucher erstellt. 🎜🎜Zuerst müssen wir die Verbraucherkonfiguration erstellen: 🎜rrreee🎜Hier legen wir den Empfangsfehler fest. 🎜🎜Als nächstes müssen wir eine Consumer-Instanz erstellen: 🎜rrreee🎜Hier geben wir auch eine Kafka-Broker-Adresse an. Wir müssen auch die Methode .Close() aufrufen, um sicherzustellen, dass der Verbraucher beim Beenden aufräumt. 🎜🎜Jetzt sind wir bereit, Nachrichten aus dem Kafka-Thema zu lesen: 🎜rrreee🎜In diesem Beispiel abonnieren wir das Thema mit dem Namen „test“. Dann lesen wir den Offset der ersten Partition. Anschließend lesen wir Nachrichten von dieser Partition in einer Endlosschleife. Die select-Anweisung in der Schleife überwacht immer die Nachrichten- und Fehlerkanäle und gibt sie entsprechend aus. 🎜🎜Bisher haben wir vorgestellt, wie man Golang und Kafka kombiniert. Mit diesem einfachen Beispiel sollten Sie die grundlegende Verwendung von Golang und Kafka beherrschen. 🎜

Das obige ist der detaillierte Inhalt vonWie man Golang und Kafka zusammen verwendet. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage