Lors de la création de systèmes distribués, les files d'attente de messages comme Amazon SQS jouent un rôle crucial dans la gestion des charges de travail asynchrones. Dans cet article, je partagerai mon expérience dans la mise en œuvre d'un consommateur SQS robuste dans Go qui gère les événements d'enregistrement des utilisateurs pour Keycloak. La solution utilise le modèle de simultanéité fan-out/fan-in pour traiter les messages efficacement sans surcharger les ressources système.
J'ai été confronté à un problème intéressant : traiter quotidiennement environ 50 000 événements SQS pour enregistrer les utilisateurs dans Keycloak. Une approche naïve pourrait générer une nouvelle goroutine pour chaque message, mais cela pourrait rapidement conduire à un épuisement des ressources. Nous avions besoin d'une approche plus contrôlée de la concurrence.
Le modèle fan-out/fan-in est parfait pour ce cas d'utilisation car il :
Tout d'abord, regardons notre structure de consommation de base :
type Consumer struct { Client *sqs.Client QueueName string }
La mise en œuvre se compose de trois éléments principaux :
Voici comment nous démarrons le consommateur :
func StartPool[requestBody any]( serviceFunc func(c context.Context, dto *requestBody) error, consumer *Consumer) { ctx := context.Background() params := &sqs.ReceiveMessageInput{ MaxNumberOfMessages: 10, QueueUrl: aws.String(consumer.QueueName), WaitTimeSeconds: 20, VisibilityTimeout: 30, MessageAttributeNames: []string{ string(types.QueueAttributeNameAll), }, } msgCh := make(chan types.Message) var wg sync.WaitGroup // Start worker pool first startPool(ctx, msgCh, &wg, consumer, serviceFunc) // Then start receiving messages // ... rest of the implementation }
Examinons les paramètres de configuration SQS cruciaux :
Le pool de travailleurs est l'endroit où le modèle de répartition entre en jeu :
func startPool[requestBody any]( ctx context.Context, msgCh chan types.Message, wg *sync.WaitGroup, consumer *Consumer, serviceFunc func(c context.Context, dto *requestBody) error) { processingMessages := &sync.Map{} // Start 10 workers for i := 0; i < 10; i++ { go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc) } }
Nous utilisons un sync.Map pour éviter de traiter les messages en double :
type Consumer struct { Client *sqs.Client QueueName string }
Le modèle fan-out/fan-in fournit une solution élégante pour traiter des messages SQS à volume élevé dans Go. En maintenant un pool de travailleurs fixe, nous évitons les pièges de la création illimitée de goroutines tout en garantissant un traitement efficace des messages.
N'oubliez pas de toujours tenir compte de votre cas d'utilisation spécifique lors de la mise en œuvre de tels modèles. Les valeurs de configuration affichées ici (nombre de travailleurs, valeurs de délai d'attente, etc.) doivent être ajustées en fonction de vos besoins et des contraintes de ressources.
Code source : [Lien vers votre référentiel si disponible]
Balises : #golang #aws #sqs #concurrency #distributed-systems
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!