Connect Kafka with Golang

WBOY
Release: 2024-09-06 22:30:32
Original
393 people have browsed it

Introduction

If you need to know the basics ofKafka, such as its key features, components, and advantages, I have an article covering that here. Please review it and follow the steps until you've completed theKafkainstallation usingDockerto proceed with the following sections.

Connect Kafka with Golang

Connecting to Kafka with Golang

Similar to the example in the article about connectingKafkawithNodeJS, this source code also includes two parts: initializing aproducerto sendmessagestoKafkaand using aconsumerto subscribe to messages from atopic.

I'll break down the code into smaller parts for better understanding. First, let's define the variable values.

package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
Copy after login

- Here, the packagegithub.com/confluentinc/confluent-kafka-go/kafkais used to connect toKafka.

- Thebrokeris the host address; if you are usingZooKeeper, replace the host address accordingly.

- ThegroupIdandtopiccan be changed as needed.

Next is initializing the producer.

func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
Copy after login

The above code is used to send an array of messages{"message 1", "message 2", "message 3"}to a topic and uses ago-routineto iterate through events withfor e := range p.Events()and print out the delivery result, whether it's a success or failure.

Next is creating aconsumertosubscribeto thetopicand receivemessages.

func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Copy after login

Finally, since this is a simple example, call the functions to create theproducerandconsumerfor use. In a real-world scenario, the deployment of theproducerandconsumeris typically done on two different servers in amicroservicessystem.

func main() { startProducer() startConsumer() }
Copy after login

Connect Kafka with Golang

Happy coding!


If you found this content helpful, please visit the original article on my blog to support the author and explore more interesting content.

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


Some series you might find interesting:

  • NodeJS
  •  React
  • Docker 
  • Kubernetes

The above is the detailed content of Connect Kafka with Golang. For more information, please follow other related articles on the PHP Chinese website!

source:dev.to
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!