Sambung Kafka dengan Golang

WBOY
Lepaskan: 2024-09-06 22:30:32
asal
358 orang telah melayarinya

pengenalan

Jika anda perlu mengetahui asasKafka, seperti ciri utama, komponen dan kelebihannya, saya ada artikel yang membincangkannya di sini. Sila semak dan ikuti langkah-langkah sehingga anda menyelesaikan pemasanganKafkamenggunakanDockeruntuk meneruskan bahagian berikut.

Connect Kafka with Golang

Bersambung ke Kafka dengan Golang

Sama seperti contoh dalam artikel tentang menyambungkanKafkadenganNodeJS, kod sumber ini juga termasuk dua bahagian: memulakanpengeluaruntuk menghantarmesejkeKafkadan menggunakanpenggunauntuk melanggan mesej daripadatopik.

Saya akan memecahkan kod kepada bahagian yang lebih kecil untuk pemahaman yang lebih baik. Mula-mula, mari kita takrifkan nilai pembolehubah.

package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
Salin selepas log masuk

- Di sini, pakejgithub.com/confluentinc/confluent-kafka-go/kafkadigunakan untuk menyambung keKafka.

-brokerialah alamat hos; jika anda menggunakanZooKeeper, gantikan alamat hos dengan sewajarnya.

-groupIddantopikboleh ditukar mengikut keperluan.

Seterusnya ialah memulakan penerbit.

func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
Salin selepas log masuk

Kod di atas digunakan untuk menghantar tatasusunan mesej{"mesej 1", "mesej 2", "mesej 3"}kepada topik dan menggunakanpergi rutinuntuk mengulangi acara denganuntuk e := julat p.Events()dan mencetak hasil penghantaran, sama ada ia berjaya atau gagal.

Seterusnya ialah menciptapenggunauntukmelanggantopikdan menerimamesej.

func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Salin selepas log masuk

Akhir sekali, kerana ini adalah contoh mudah, panggil fungsi untuk menciptapengeluardanpenggunauntuk digunakan. Dalam senario dunia sebenar, penggunaanpengeluardanpenggunabiasanya dilakukan pada dua pelayan berbeza dalam sistemperkhidmatan mikro.

func main() { startProducer() startConsumer() }
Salin selepas log masuk

Connect Kafka with Golang

Selamat mengekod!


Jika anda mendapati kandungan ini membantu, sila lawati artikel asal di blog saya untuk menyokong pengarang dan meneroka kandungan yang lebih menarik.

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang


Sesetengah siri yang mungkin anda rasa menarik:

  • NodeJS
  •  Bertindak balas
  • Doker 
  • Kubernetes

Atas ialah kandungan terperinci Sambung Kafka dengan Golang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:dev.to
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan
Tentang kita Penafian Sitemap
Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!