Kafka の主な機能、コンポーネント、利点などの基本を知る必要がある場合は、ここで説明した記事を参照してください。 Docker を使用して Kafka のインストールが完了するまで内容を確認し、次のセクションに進んでください。
Kafka と NodeJS の接続に関する記事の例と同様に、このソース コードには、プロデューサー メッセージをKafkaに送信し、コンシューマを使用してトピック. 理解を助けるために、コードを小さな部分に分割します。まず、変数の値を定義しましょう。
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
を使用して Kafka に接続します。 -
ブローカーはホスト アドレスです。 ZooKeeper を使用している場合は、ホスト アドレスを適宜置き換えてください。 -
groupIdと topic は、必要に応じて変更できます。 次はプロデューサーを初期化します。
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
{"message 1", "message 2", "message 3"} をトピックに送信するために使用され、go-routine は、for e := range p.Events() を使用してイベントを反復処理し、配信結果を出力します。成功か失敗か。 次に、
トピックにサブスクライブし、メッセージを受信するコンシューマを作成します。
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
と コンシューマー を作成する関数を呼び出します。実際のシナリオでは、プロデューサー と コンシューマー のデプロイメントは通常、マイクロサービス システム内の 2 つの異なるサーバーで行われます。
func main() { startProducer() startConsumer() }
コーディングを楽しんでください!
このコンテンツが役立つと思われた場合は、著者をサポートし、より興味深いコンテンツを探索するために、私のブログの元の記事にアクセスしてください。
興味深いと思われるシリーズ:
NodeJS
以上がKafka と Golang を接続するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。