Maison développement back-end Golang Implémenter Kafka avec Golang

Implémenter Kafka avec Golang

May 10, 2023 pm 01:18 PM

Avec la complexité croissante de l'architecture des applications au niveau de l'entreprise, la transmission des messages est devenue un élément crucial. C’est alors que Kafka apparaît. Kafka est une file d'attente de messages distribuée efficace et fiable qui prend en charge la publication et l'abonnement des messages. Il s'agit d'un système de messagerie d'entreprise moderne avec un débit très élevé et une faible latence. Dans l'API de Kafka, bien que le client officiel propose plusieurs langues, Golang est devenu de plus en plus largement utilisé ces dernières années, c'est pourquoi cet article utilise Golang comme langage d'implémentation pour expliquer comment utiliser Golang pour implémenter Kafka.

1. Dépendances

Avant de commencer, vous devez télécharger les dépendances requises :

  • sarama : bibliothèque client Golang Kafka
  • pkg/errors : encapsuler le package d'erreurs de la bibliothèque standard Go

Méthode d'utilisation spécifique Comme suit :

allez sur github.com/Shopify/sarama
allez sur github.com/pkg/errors

2. Créer un producteur

Avant d'introduire l'API de Kafka, vous devez d'abord créer une instance de producteur. Le code du producteur est le suivant :

package main

import (
    "fmt"
    "time"

    "github.com/pkg/errors"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create producer"))
    }
    defer producer.Close()

    for i := 0; i < 10; i++ {
        message := &sarama.ProducerMessage{
            Topic: "test_topic",
            Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)),
        }
        partition, offset, err := producer.SendMessage(message)
        if err != nil {
            fmt.Println(errors.Wrapf(err, "failed to send message: %s", message))
        } else {
            fmt.Printf("message sent to partition %d at offset %d
", partition, offset)
        }

        time.Sleep(500 * time.Millisecond) // 延迟发送
    }
}

Le code fait principalement les choses suivantes :

  • Configurer le producteur : définissez la configuration du producteur, spécifiez la méthode de partitionnement comme partitionnement aléatoire, et attendez que tous les nœuds ISR confirmez le message, puis revenez et renvoyez la partition et le décalage après une transmission réussie.
  • Créer un producteur : créez une instance de producteur avec l'adresse et la configuration du courtier spécifiées.
  • Envoyer un message : créez un message avec le sujet et le contenu du message, puis envoyez-le.
  • Résultats de sortie : résultats d'impression, partition d'enregistrement et décalage des messages.

3. Créer un consommateur

Deuxièmement, vous devez créer une instance de consommateur. Le code du consommateur est le suivant :

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
    "github.com/pkg/errors"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(errors.Wrap(err, "failed to create consumer"))
    }
    defer consumer.Close()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    partitions, err := consumer.Partitions("test_topic")
    if err != nil {
        panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic"))
    }

    ctx, cancel := context.WithCancel(context.Background())

    for _, partition := range partitions {
        go func(partition int32) {
            partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest)
            if err != nil {
                fmt.Printf("failed to create partition consumer for partition %d: %s
", partition, err)
                return
            }
            defer partitionConsumer.Close()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    fmt.Printf("Consumed message from partition %d at offset %d: %s
", msg.Partition, msg.Offset, msg.Value)
                case <-signals:
                    cancel()
                    return
                case err := <-partitionConsumer.Errors():
                    fmt.Printf("Consumed error from partition %d: %s
", partition, err)
                case <-ctx.Done():
                    return
                }
            }
        }(partition)
    }

    <-signals
    fmt.Println("Shutting down consumer")
}

Le code fait principalement les choses suivantes :

  • Configurer le consommateur : configurez le consommateur et définissez le commutateur de retour d'erreur.
  • Créer un consommateur : créez une instance de consommateur basée sur l'adresse et la configuration du courtier spécifiées.
  • Obtenir la partition : obtenez la partition du sujet spécifié.
  • Consommation : ouvrez une goroutine pour chaque partition pour une consommation séparée.
  • Résultats de sortie : imprimez les messages consommés.

IV.Résumé

Ci-dessus, nous avons utilisé Golang pour implémenter les parties producteur et consommateur de Kafka. En tant que l'un des composants importants de la mise en œuvre de systèmes distribués, Kafka peut résoudre le problème des systèmes de messagerie existant dans les environnements distribués et à haute concurrence. problèmes, et Kafka dispose également d'une bonne documentation de support et d'une communauté stable, ce qui rend son application sans stress dans le développement réel.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Outils d'IA chauds

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

Video Face Swap

Video Face Swap

Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

Outils chauds

Bloc-notes++7.3.1

Bloc-notes++7.3.1

Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise

SublimeText3 version chinoise

Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1

Envoyer Studio 13.0.1

Puissant environnement de développement intégré PHP

Dreamweaver CS6

Dreamweaver CS6

Outils de développement Web visuel

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Sujets chauds

Tutoriel PHP
1545
276
Développer des opérateurs de Kubernetes en Go Développer des opérateurs de Kubernetes en Go Jul 25, 2025 am 02:38 AM

La façon la plus efficace d'écrire un kubernetesoperator est d'utiliser Go pour combiner kubebuilder et contrôleur-runtime. 1. Comprendre le modèle de l'opérateur: définir les ressources personnalisées via CRD, écrivez un contrôleur pour écouter les changements de ressources et effectuer des boucles de réconciliation pour maintenir l'état attendu. 2. Utilisez KubeBuilder pour initialiser le projet et créer des API pour générer automatiquement les CRD, les contrôleurs et les fichiers de configuration. 3. Définissez la structure de spécification et de statut de CRD dans API / V1 / MYAPP_TYPES.go, et exécutez MakeManifests pour générer CRDYAML. 4. Réconcilier dans le contrôleur

Comment se remettre d'une panique en Go? Comment se remettre d'une panique en Go? Jul 23, 2025 am 04:11 AM

La panique est comme un programme "cardiaque" en Go. Le récupération peut être utilisé comme "outil de premiers soins" pour éviter les accidents, mais récupérer ne prend effet que dans la fonction de différence. 1. Le débit est utilisé pour éviter les laps de service, les journaux de journaux et les erreurs amicales de retour. 2. Il doit être utilisé en conjonction avec un repère et ne prend effet que sur la même goroutine. Le programme ne revient pas au point de panique après la récupération. 3. Il est recommandé de l'utiliser au niveau supérieur ou à l'entrée critique, et ne vous en abusez pas, et n'accordez pas la priorité à l'utilisation du traitement des erreurs. 4. Le modèle commun est d'encapsuler des fonctions Saferun pour envelopper une éventuelle logique de panique. Ce n'est qu'en maîtrisant ses scénarios d'utilisation et ses limitations qu'il peut jouer correctement son rôle.

Stack vs tas-tas allocation avec des pointeurs en Go Stack vs tas-tas allocation avec des pointeurs en Go Jul 23, 2025 am 04:14 AM

L'allocation de pile convient aux petites variables locales avec des cycles de vie clairs et est automatiquement géré, à vitesse rapide mais de nombreuses restrictions; L'allocation de tas est utilisée pour les données avec des cycles de vie longs ou incertains, et est flexible mais a un coût de performance. Le compilateur GO détermine automatiquement la position d'allocation variable par analyse d'échappement. Si la variable peut s'échapper de la portée de la fonction actuelle, elle sera allouée au tas. Les situations courantes qui provoquent une évasion comprennent: le renvoi des pointeurs de variables locales, l'attribution de valeurs aux types d'interface et le passage des goroutines. Les résultats de l'analyse d'échappement peuvent être visualisés via -gcflags = "- m". Lorsque vous utilisez des pointeurs, vous devez faire attention au cycle de vie variable pour éviter les évasions inutiles.

Optez pour l'informatique scientifique et l'analyse numérique Optez pour l'informatique scientifique et l'analyse numérique Jul 23, 2025 am 01:53 AM

Le langage GO peut être utilisé pour les calculs scientifiques et l'analyse numérique, mais il faut comprendre. L'avantage réside dans le support et les performances de la concurrence, qui conviennent aux algorithmes parallèles tels que la solution distribuée, la simulation Monte Carlo, etc.; Les bibliothèques communautaires telles que Gonum et MAT64 offrent des fonctions de calcul numérique de base; La programmation hybride peut être utilisée pour appeler C / C et Python via CGO ou Interface pour améliorer la praticité. La limitation est que l'écosystème n'est pas aussi mature que Python, la visualisation et les outils avancés sont plus faibles et certains documents de bibliothèque sont incomplets. Il est recommandé de sélectionner des scénarios appropriés en fonction des fonctionnalités GO et de se référer aux exemples de code source pour les utiliser en profondeur.

Passez l'exemple de l'exemple de journalisation du middleware http Passez l'exemple de l'exemple de journalisation du middleware http Aug 03, 2025 am 11:35 AM

HTTP Log Middleware dans GO peut enregistrer les méthodes de demande, les chemins de requête, la propriété intellectuelle du client et le temps qui prend du temps. 1. Utilisez http.handlerfunc pour envelopper le processeur, 2. Enregistrez l'heure de début et l'heure de fin avant et après l'appel Suivant.Servehttp, 3. Obtenez le vrai client IP via R.RemoteAddr et X-Forwared-For Headers, 4. Utilisez le log.printf aux journaux de demande de sortie, 5. L'exemple de code complet a été vérifié pour s'exécuter et convient au démarrage d'un projet petit et moyen. Les suggestions d'extension incluent la capture des codes d'état, la prise en charge des journaux JSON et le suivi des ID de demande.

La lecture de Stdin dans GO par l'exemple La lecture de Stdin dans GO par l'exemple Jul 27, 2025 am 04:15 AM

Utilisez fmt.scanf pour lire l'entrée formatée, adaptée aux données structurées simples, mais la chaîne est coupée lors de la rencontre des espaces; 2. Il est recommandé d'utiliser Bufio.Scanner pour lire la ligne par ligne, prend en charge les entrées multi-lignes, la détection EOF et l'entrée du pipeline et peut gérer les erreurs de numérisation; 3. Utilisez io.readall (os.stdin) pour lire toutes les entrées à la fois, adapté au traitement de grandes données de bloc ou de flux de fichiers; 4. La réponse clé en temps réel nécessite des bibliothèques tierces telles que golang.org/x/term, et Bufio est suffisant pour les scénarios conventionnels; Suggestions pratiques: utilisez fmt.scan pour une entrée simple interactive, utilisez bufio.scanner pour une entrée de ligne ou un pipeline, utilisez io.readall pour les données de gros bloc et gérez toujours

Comment fonctionne l'instruction Switch? Comment fonctionne l'instruction Switch? Jul 30, 2025 am 05:11 AM

L'instruction Switch de Go ne sera pas exécutée tout au long du processus par défaut et quittera automatiquement après la correspondance de la première condition. 1. Switch commence par un mot-clé et peut transporter une valeur ou aucune valeur; 2. Les matchs de cas de haut en bas dans l'ordre, seul le premier match est exécuté; 3. Plusieurs conditions peuvent être répertoriées par des virgules pour correspondre au même cas; 4. Il n'est pas nécessaire d'ajouter manuellement la pause, mais peut être forcé; 5.Default est utilisé pour les cas inégalés, généralement placés à la fin.

Passez par l'exemple des génériques Passez par l'exemple des génériques Jul 29, 2025 am 04:10 AM

Les génériques GO sont pris en charge depuis 1.18 et sont utilisés pour écrire du code générique pour le type de type. 1. La fonction générique imprimslice [tany] (s [] t) peut imprimer des tranches de tout type, telles que [] int ou [] chaîne. 2. Grâce au nombre de contraintes de type, les limites des types numériques tels que int et float, sum [tnumber] (tranche [] t) t La sommation sûre est réalisée. 3. La boîte de type générique de structure [tany] struct {Valuet} peut encapsuler n'importe quelle valeur de type et être utilisée avec le constructeur newbox [tany] (VT) * Box [t]. 4. Ajouter SET (VT) et Get () T Méthodes T à boxer [t] sans

See all articles