golangでkafkaを実装する

王林
リリース: 2023-05-10 13:18:37
オリジナル
1565 人が閲覧しました

エンタープライズ レベルのアプリケーション アーキテクチャがますます複雑になるにつれて、メッセージ送信が重要なコンポーネントになっています。ここでカフカが登場します。 Kafka は、メッセージのパブリッシュとサブスクリプションをサポートする効率的で信頼性の高い分散メッセージ キューであり、非常に高いスループットと低い遅延を備えた最新のエンタープライズ レベルのメッセージング システムです。 Kafka の API では、公式クライアントが複数の言語を提供していますが、近年 Golang が広く使われるようになってきたため、この記事では実装言語として Golang を使用して、Golang を使用して Kafka を実装する方法を説明します。

1. 依存関係

開始する前に、必要な依存関係をダウンロードする必要があります:

  • sarama: Golang Kafka client library
  • pkg /エラー: Go 標準ライブラリのエラー パッケージをカプセル化します。

具体的な使用方法は次のとおりです。

go get github.com/Shopify/sarama
go get github. com/ pkg/errors

2. プロデューサーの作成

Kafka の API を導入する前に、最初にプロデューサー インスタンスを作成する必要があります。プロデューサーのコードは次のとおりです:

package main

import (
    "fmt"
    "time"

    "github.com/pkg/errors"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create producer"))
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        message := &sarama.ProducerMessage{
            Topic: "test_topic",
            Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)),
        }
        partition, offset, err := producer.SendMessage(message)
        if err != nil {
            fmt.Println(errors.Wrapf(err, "failed to send message: %s", message))
        } else {
            fmt.Printf("message sent to partition %d at offset %d
", partition, offset)
        }

        time.Sleep(500 * time.Millisecond) // 延迟发送
    }
}
ログイン後にコピー

コードは主に次のことを行います:

  • プロデューサーの構成: プロデューサーの構成を設定し、パーティショニング方法をランダムとして指定します。パーティショニングでは、すべての ISR ノードがメッセージを確認してから返すのを待ち、送信が成功した後にパーティションとオフセットを返す必要があります。
  • プロデューサーの作成: 指定されたブローカー アドレスと構成を使用してプロデューサー インスタンスを作成します。
  • メッセージの送信: メッセージの件名と内容を含むメッセージを作成し、送信します。
  • 出力結果: 結果の印刷、メッセージのパーティションとオフセットの記録。

3. コンシューマの作成

2 番目に、コンシューマ インスタンスを作成する必要があります。コンシューマ コードは次のとおりです。

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
    "github.com/pkg/errors"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create consumer"))
    }
    defer consumer.Close()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    partitions, err := consumer.Partitions("test_topic")
    if err != nil {
        panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic"))
    }

    ctx, cancel := context.WithCancel(context.Background())

    for _, partition := range partitions {
        go func(partition int32) {
            partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest)
            if err != nil {
                fmt.Printf("failed to create partition consumer for partition %d: %s
", partition, err)
                return
            }
            defer partitionConsumer.Close()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    fmt.Printf("Consumed message from partition %d at offset %d: %s
", msg.Partition, msg.Offset, msg.Value)
                case <-signals:
                    cancel()
                    return
                case err := <-partitionConsumer.Errors():
                    fmt.Printf("Consumed error from partition %d: %s
", partition, err)
                case <-ctx.Done():
                    return
                }
            }
        }(partition)
    }

    <-signals
    fmt.Println("Shutting down consumer")
}
ログイン後にコピー

コードは主に次のことを行います。

  • コンシューマの設定: コンシューマを設定し、エラー リターン スイッチを設定します。
  • コンシューマーの作成: 指定されたブローカーのアドレスと構成に基づいてコンシューマー インスタンスを作成します。
  • Get Partition: 指定されたトピックのパーティションを取得します。
  • 消費: 個別に消費するために各パーティションの goroutine を開きます。
  • 出力結果: 消費されたメッセージを出力します。

4. まとめ

上記では、Golang を使用して Kafka のプロデューサー部分とコンシューマー部分を実装しました。分散システムを実現するための重要なコンポーネントの 1 つとして、Kafka はメッセージを解決できます。システムには同時実行性の高い分散環境では問題がありますが、Kafka には優れたサポート ドキュメントと安定したコミュニティがあるため、実際の開発にストレスなく適用できます。

以上がgolangでkafkaを実装するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート