Jika anda perlu mengetahui asasKafka, seperti ciri utama, komponen dan kelebihannya, saya ada artikel yang membincangkannya di sini. Sila semak dan ikuti langkah-langkah sehingga anda menyelesaikan pemasanganKafkamenggunakanDockeruntuk meneruskan bahagian berikut.
Sama seperti contoh dalam artikel tentang menyambungkanKafkadenganNodeJS, kod sumber ini juga termasuk dua bahagian: memulakanpengeluaruntuk menghantarmesejkeKafkadan menggunakanpenggunauntuk melanggan mesej daripadatopik.
Saya akan memecahkan kod kepada bahagian yang lebih kecil untuk pemahaman yang lebih baik. Mula-mula, mari kita takrifkan nilai pembolehubah.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
- Di sini, pakejgithub.com/confluentinc/confluent-kafka-go/kafkadigunakan untuk menyambung keKafka.
-brokerialah alamat hos; jika anda menggunakanZooKeeper, gantikan alamat hos dengan sewajarnya.
-groupIddantopikboleh ditukar mengikut keperluan.
Seterusnya ialah memulakan penerbit.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
Kod di atas digunakan untuk menghantar tatasusunan mesej{"mesej 1", "mesej 2", "mesej 3"}kepada topik dan menggunakanpergi rutinuntuk mengulangi acara denganuntuk e := julat p.Events()dan mencetak hasil penghantaran, sama ada ia berjaya atau gagal.
Seterusnya ialah menciptapenggunauntukmelanggantopikdan menerimamesej.
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Akhir sekali, kerana ini adalah contoh mudah, panggil fungsi untuk menciptapengeluardanpenggunauntuk digunakan. Dalam senario dunia sebenar, penggunaanpengeluardanpenggunabiasanya dilakukan pada dua pelayan berbeza dalam sistemperkhidmatan mikro.
func main() { startProducer() startConsumer() }
Selamat mengekod!
Jika anda mendapati kandungan ini membantu, sila lawati artikel asal di blog saya untuk menyokong pengarang dan meneroka kandungan yang lebih menarik.
Sesetengah siri yang mungkin anda rasa menarik:
Atas ialah kandungan terperinci Sambung Kafka dengan Golang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!