Using Apache Kafka in Go: A Complete Guide

WBOY
Release: 2023-06-17 12:21:07
Original
1463 people have browsed it

Apache Kafka is a message queuing system based on the publish-subscribe model. It provides a reliable, efficient and scalable message delivery mechanism and is widely used in big data, real-time data stream processing, log collection and other fields. . The Go language is a fast, distributed, and concurrent programming language. It is naturally suitable for handling message passing and processing in high-concurrency scenarios. In this article, we’ll cover how to use Apache Kafka for messaging in Go, with a complete guide and code examples.

Step One: Install and Configure Apache Kafka

First, we need to install and configure Apache Kafka. You can download the latest Kafka version from the official website, unzip it and start the Kafka server:

$ tar -xzf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Copy after login

Then start the Kafka server:

$ bin/kafka-server-start.sh config/server.properties
Copy after login

Next, we need to create a Kafka topic (topic), use For storing and delivering messages:

$ bin/kafka-topics.sh --create --topic my_topic 
--bootstrap-server localhost:9092 
--replication-factor 1 
--partitions 1
Copy after login

This command will create a topic named "my_topic" and configure a replication factor and a partition on the local node.

Step 2: Introduction and installation of Kafka Go library

To use Kafka in Go language, we need to introduce the third-party Kafka Go library. Currently, the Go language officially does not provide Kafka-related standard libraries, but the third-party libraries in the community are already very mature and stable.

In this article, we will use the sarama library. You can use the following command to install:

$ go get github.com/Shopify/sarama
Copy after login

Here we need to introduce the sarama package and use two APIs, producer and consumer, for message passing.

Step 3: Use the producer API to send messages

It is very simple to use the Kafka producer API to send messages in the Go language. First, we need to create a Kafka producer object:

import (
    "log"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        log.Fatalf("Failed to create producer: %s", err)
    }
    defer producer.Close()
}
Copy after login

Here, we use the NewSyncProducer() function in the sarama package to create a synchronous producer object and specify the address and configuration information of the Kafka server . After the creation is successful, you need to use the defer statement to ensure that the producer object is closed after the program ends.

Next, we can use the Produce() function to send messages to the Kafka topic:

msg := &sarama.ProducerMessage{
    Topic: "my_topic",
    Value: sarama.StringEncoder("hello, kafka"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Fatalf("Failed to send message: %s", err)
}
log.Printf("Message sent to partition %d at offset %d", partition, offset)
Copy after login

Here, first create a sarama.ProducerMessage object, set the topic name and message content, The message is then sent to the target topic using the SendMessage() function of the producer object.

Step 4: Use the consumer API to receive messages from the topic

It is also very simple to use the Kafka consumer API to receive messages in the Go language. First, we need to create a Kafka consumer object:

config := sarama.NewConfig()
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config)
if err != nil {
    log.Fatalf("Failed to create consumer: %s", err)
}
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest)
if err != nil {
    log.Fatalf("Failed to consume partition: %s", err)
}
defer partitionConsumer.Close()
Copy after login

Here, we use the NewConsumer() function in the sarama package to create a consumer object and establish a connection with the Kafka server. After successful creation, you need to use the defer statement to ensure that the consumer object is closed after the program ends.

Next, we use the ConsumePartition() function to subscribe to a specific topic and partition and set the starting offset of the message. This function returns a PartitionConsumer object, we need to use the defer statement to ensure that it is closed after the program ends.

Finally, we can use the Consumer.Messages() function in a for loop to get the messages and process them:

for {
    select {
    case msg := <-partitionConsumer.Messages():
        log.Printf("Received message: %s", string(msg.Value))
    case err := <-partitionConsumer.Errors():
        log.Fatalf("Error while consuming: %s", err)
    }
}
Copy after login

Here, we use the Messages() function to get the messages from the PartitionConsumer object , and then use a for loop to process it. Because Kafka is a highly concurrent messaging system, it is necessary to use select statements to handle message notifications from multiple channels. Note that after processing the message, you need to use the Ack() function to manually confirm that the message has been consumed.

Full code example

package main

import (
    "log"

    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        log.Fatalf("Failed to create producer: %s", err)
    }
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "my_topic",
        Value: sarama.StringEncoder("hello, kafka"),
    }
    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Fatalf("Failed to send message: %s", err)
    }
    log.Printf("Message sent to partition %d at offset %d", partition, offset)

    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        log.Fatalf("Failed to create consumer: %s", err)
    }
    defer consumer.Close()

    partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest)
    if err != nil {
        log.Fatalf("Failed to consume partition: %s", err)
    }
    defer partitionConsumer.Close()

    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Received message: %s", string(msg.Value))
            partitionConsumer.MarkOffset(msg, "")
        case err := <-partitionConsumer.Errors():
            log.Fatalf("Error while consuming: %s", err)
        }
    }
}
Copy after login

Summary

In this article, we introduce how to use Apache Kafka for messaging in Go language, and provide complete installation and configuration , introduce dependent libraries and code implementation. Kafka is an efficient and reliable messaging system that has been widely used in big data, real-time data stream processing, log collection and other scenarios. When using Kafka, you need to pay attention to some key points, such as manually confirming the completion of message consumption, processing message notifications from multiple channels, etc. I hope this article will be helpful to you in writing high-concurrency, distributed programs using Kafka and Go language.

The above is the detailed content of Using Apache Kafka in Go: A Complete Guide. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!