If you need to know the basics ofKafka, such as its key features, components, and advantages, I have an article covering that here. Please review it and follow the steps until you've completed theKafkainstallation usingDockerto proceed with the following sections.
Similar to the example in the article about connectingKafkawithNodeJS, this source code also includes two parts: initializing aproducerto sendmessagestoKafkaand using aconsumerto subscribe to messages from atopic.
I'll break down the code into smaller parts for better understanding. First, let's define the variable values.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
- Here, the packagegithub.com/confluentinc/confluent-kafka-go/kafkais used to connect toKafka.
- Thebrokeris the host address; if you are usingZooKeeper, replace the host address accordingly.
- ThegroupIdandtopiccan be changed as needed.
Next is initializing the producer.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
The above code is used to send an array of messages{"message 1", "message 2", "message 3"}to a topic and uses ago-routineto iterate through events withfor e := range p.Events()and print out the delivery result, whether it's a success or failure.
Next is creating aconsumertosubscribeto thetopicand receivemessages.
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Finally, since this is a simple example, call the functions to create theproducerandconsumerfor use. In a real-world scenario, the deployment of theproducerandconsumeris typically done on two different servers in amicroservicessystem.
func main() { startProducer() startConsumer() }
Happy coding!
If you found this content helpful, please visit the original article on my blog to support the author and explore more interesting content.
Some series you might find interesting:
The above is the detailed content of Connect Kafka with Golang. For more information, please follow other related articles on the PHP Chinese website!