Implémenter Kafka avec Golang
Avec la complexité croissante de l'architecture des applications au niveau de l'entreprise, la transmission des messages est devenue un élément crucial. C’est alors que Kafka apparaît. Kafka est une file d'attente de messages distribuée efficace et fiable qui prend en charge la publication et l'abonnement des messages. Il s'agit d'un système de messagerie d'entreprise moderne avec un débit très élevé et une faible latence. Dans l'API de Kafka, bien que le client officiel propose plusieurs langues, Golang est devenu de plus en plus largement utilisé ces dernières années, c'est pourquoi cet article utilise Golang comme langage d'implémentation pour expliquer comment utiliser Golang pour implémenter Kafka.
1. Dépendances
Avant de commencer, vous devez télécharger les dépendances requises :
- sarama : bibliothèque client Golang Kafka
- pkg/errors : encapsuler le package d'erreurs de la bibliothèque standard Go
Méthode d'utilisation spécifique Comme suit :
allez sur github.com/Shopify/sarama
allez sur github.com/pkg/errors
2. Créer un producteur
Avant d'introduire l'API de Kafka, vous devez d'abord créer une instance de producteur. Le code du producteur est le suivant :
package main import ( "fmt" "time" "github.com/pkg/errors" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create producer")) } defer producer.Close() for i := 0; i < 10; i++ { message := &sarama.ProducerMessage{ Topic: "test_topic", Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)), } partition, offset, err := producer.SendMessage(message) if err != nil { fmt.Println(errors.Wrapf(err, "failed to send message: %s", message)) } else { fmt.Printf("message sent to partition %d at offset %d ", partition, offset) } time.Sleep(500 * time.Millisecond) // 延迟发送 } }
Le code fait principalement les choses suivantes :
- Configurer le producteur : définissez la configuration du producteur, spécifiez la méthode de partitionnement comme partitionnement aléatoire, et attendez que tous les nœuds ISR confirmez le message, puis revenez et renvoyez la partition et le décalage après une transmission réussie.
- Créer un producteur : créez une instance de producteur avec l'adresse et la configuration du courtier spécifiées.
- Envoyer un message : créez un message avec le sujet et le contenu du message, puis envoyez-le.
- Résultats de sortie : résultats d'impression, partition d'enregistrement et décalage des messages.
3. Créer un consommateur
Deuxièmement, vous devez créer une instance de consommateur. Le code du consommateur est le suivant :
package main import ( "context" "fmt" "os" "os/signal" "github.com/Shopify/sarama" "github.com/pkg/errors" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create consumer")) } defer consumer.Close() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) partitions, err := consumer.Partitions("test_topic") if err != nil { panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic")) } ctx, cancel := context.WithCancel(context.Background()) for _, partition := range partitions { go func(partition int32) { partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest) if err != nil { fmt.Printf("failed to create partition consumer for partition %d: %s ", partition, err) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Consumed message from partition %d at offset %d: %s ", msg.Partition, msg.Offset, msg.Value) case <-signals: cancel() return case err := <-partitionConsumer.Errors(): fmt.Printf("Consumed error from partition %d: %s ", partition, err) case <-ctx.Done(): return } } }(partition) } <-signals fmt.Println("Shutting down consumer") }
Le code fait principalement les choses suivantes :
- Configurer le consommateur : configurez le consommateur et définissez le commutateur de retour d'erreur.
- Créer un consommateur : créez une instance de consommateur basée sur l'adresse et la configuration du courtier spécifiées.
- Obtenir la partition : obtenez la partition du sujet spécifié.
- Consommation : ouvrez une goroutine pour chaque partition pour une consommation séparée.
- Résultats de sortie : imprimez les messages consommés.
IV.Résumé
Ci-dessus, nous avons utilisé Golang pour implémenter les parties producteur et consommateur de Kafka. En tant que l'un des composants importants de la mise en œuvre de systèmes distribués, Kafka peut résoudre le problème des systèmes de messagerie existant dans les environnements distribués et à haute concurrence. problèmes, et Kafka dispose également d'une bonne documentation de support et d'une communauté stable, ce qui rend son application sans stress dans le développement réel.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Outils d'IA chauds

Undress AI Tool
Images de déshabillage gratuites

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Clothoff.io
Dissolvant de vêtements AI

Video Face Swap
Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

La façon la plus efficace d'écrire un kubernetesoperator est d'utiliser Go pour combiner kubebuilder et contrôleur-runtime. 1. Comprendre le modèle de l'opérateur: définir les ressources personnalisées via CRD, écrivez un contrôleur pour écouter les changements de ressources et effectuer des boucles de réconciliation pour maintenir l'état attendu. 2. Utilisez KubeBuilder pour initialiser le projet et créer des API pour générer automatiquement les CRD, les contrôleurs et les fichiers de configuration. 3. Définissez la structure de spécification et de statut de CRD dans API / V1 / MYAPP_TYPES.go, et exécutez MakeManifests pour générer CRDYAML. 4. Réconcilier dans le contrôleur

La panique est comme un programme "cardiaque" en Go. Le récupération peut être utilisé comme "outil de premiers soins" pour éviter les accidents, mais récupérer ne prend effet que dans la fonction de différence. 1. Le débit est utilisé pour éviter les laps de service, les journaux de journaux et les erreurs amicales de retour. 2. Il doit être utilisé en conjonction avec un repère et ne prend effet que sur la même goroutine. Le programme ne revient pas au point de panique après la récupération. 3. Il est recommandé de l'utiliser au niveau supérieur ou à l'entrée critique, et ne vous en abusez pas, et n'accordez pas la priorité à l'utilisation du traitement des erreurs. 4. Le modèle commun est d'encapsuler des fonctions Saferun pour envelopper une éventuelle logique de panique. Ce n'est qu'en maîtrisant ses scénarios d'utilisation et ses limitations qu'il peut jouer correctement son rôle.

L'allocation de pile convient aux petites variables locales avec des cycles de vie clairs et est automatiquement géré, à vitesse rapide mais de nombreuses restrictions; L'allocation de tas est utilisée pour les données avec des cycles de vie longs ou incertains, et est flexible mais a un coût de performance. Le compilateur GO détermine automatiquement la position d'allocation variable par analyse d'échappement. Si la variable peut s'échapper de la portée de la fonction actuelle, elle sera allouée au tas. Les situations courantes qui provoquent une évasion comprennent: le renvoi des pointeurs de variables locales, l'attribution de valeurs aux types d'interface et le passage des goroutines. Les résultats de l'analyse d'échappement peuvent être visualisés via -gcflags = "- m". Lorsque vous utilisez des pointeurs, vous devez faire attention au cycle de vie variable pour éviter les évasions inutiles.

Le langage GO peut être utilisé pour les calculs scientifiques et l'analyse numérique, mais il faut comprendre. L'avantage réside dans le support et les performances de la concurrence, qui conviennent aux algorithmes parallèles tels que la solution distribuée, la simulation Monte Carlo, etc.; Les bibliothèques communautaires telles que Gonum et MAT64 offrent des fonctions de calcul numérique de base; La programmation hybride peut être utilisée pour appeler C / C et Python via CGO ou Interface pour améliorer la praticité. La limitation est que l'écosystème n'est pas aussi mature que Python, la visualisation et les outils avancés sont plus faibles et certains documents de bibliothèque sont incomplets. Il est recommandé de sélectionner des scénarios appropriés en fonction des fonctionnalités GO et de se référer aux exemples de code source pour les utiliser en profondeur.

HTTP Log Middleware dans GO peut enregistrer les méthodes de demande, les chemins de requête, la propriété intellectuelle du client et le temps qui prend du temps. 1. Utilisez http.handlerfunc pour envelopper le processeur, 2. Enregistrez l'heure de début et l'heure de fin avant et après l'appel Suivant.Servehttp, 3. Obtenez le vrai client IP via R.RemoteAddr et X-Forwared-For Headers, 4. Utilisez le log.printf aux journaux de demande de sortie, 5. L'exemple de code complet a été vérifié pour s'exécuter et convient au démarrage d'un projet petit et moyen. Les suggestions d'extension incluent la capture des codes d'état, la prise en charge des journaux JSON et le suivi des ID de demande.

Utilisez fmt.scanf pour lire l'entrée formatée, adaptée aux données structurées simples, mais la chaîne est coupée lors de la rencontre des espaces; 2. Il est recommandé d'utiliser Bufio.Scanner pour lire la ligne par ligne, prend en charge les entrées multi-lignes, la détection EOF et l'entrée du pipeline et peut gérer les erreurs de numérisation; 3. Utilisez io.readall (os.stdin) pour lire toutes les entrées à la fois, adapté au traitement de grandes données de bloc ou de flux de fichiers; 4. La réponse clé en temps réel nécessite des bibliothèques tierces telles que golang.org/x/term, et Bufio est suffisant pour les scénarios conventionnels; Suggestions pratiques: utilisez fmt.scan pour une entrée simple interactive, utilisez bufio.scanner pour une entrée de ligne ou un pipeline, utilisez io.readall pour les données de gros bloc et gérez toujours

L'instruction Switch de Go ne sera pas exécutée tout au long du processus par défaut et quittera automatiquement après la correspondance de la première condition. 1. Switch commence par un mot-clé et peut transporter une valeur ou aucune valeur; 2. Les matchs de cas de haut en bas dans l'ordre, seul le premier match est exécuté; 3. Plusieurs conditions peuvent être répertoriées par des virgules pour correspondre au même cas; 4. Il n'est pas nécessaire d'ajouter manuellement la pause, mais peut être forcé; 5.Default est utilisé pour les cas inégalés, généralement placés à la fin.

Les génériques GO sont pris en charge depuis 1.18 et sont utilisés pour écrire du code générique pour le type de type. 1. La fonction générique imprimslice [tany] (s [] t) peut imprimer des tranches de tout type, telles que [] int ou [] chaîne. 2. Grâce au nombre de contraintes de type, les limites des types numériques tels que int et float, sum [tnumber] (tranche [] t) t La sommation sûre est réalisée. 3. La boîte de type générique de structure [tany] struct {Valuet} peut encapsuler n'importe quelle valeur de type et être utilisée avec le constructeur newbox [tany] (VT) * Box [t]. 4. Ajouter SET (VT) et Get () T Méthodes T à boxer [t] sans
