Beim Aufbau verteilter Systeme spielen Nachrichtenwarteschlangen wie Amazon SQS eine entscheidende Rolle bei der Bewältigung asynchroner Arbeitslasten. In diesem Beitrag teile ich meine Erfahrungen bei der Implementierung eines robusten SQS-Consumers in Go, der Benutzerregistrierungsereignisse für Keycloak verarbeitet. Die Lösung nutzt das Fan-Out/Fan-In-Parallelitätsmuster, um Nachrichten effizient zu verarbeiten, ohne die Systemressourcen zu überlasten.
Ich stand vor einem interessanten Problem: Verarbeiten Sie täglich etwa 50.000 SQS-Ereignisse, um Benutzer in Keycloak zu registrieren. Ein naiver Ansatz könnte für jede Nachricht eine neue Goroutine erzeugen, aber dies könnte schnell zur Erschöpfung der Ressourcen führen. Wir brauchten einen kontrollierteren Ansatz für die Parallelität.
Das Fan-Out/Fan-In-Muster ist für diesen Anwendungsfall perfekt, weil es:
Schauen wir uns zunächst unsere grundlegende Verbraucherstruktur an:
type Consumer struct { Client *sqs.Client QueueName string }
Die Implementierung besteht aus drei Hauptkomponenten:
So starten wir den Verbraucher:
func StartPool[requestBody any]( serviceFunc func(c context.Context, dto *requestBody) error, consumer *Consumer) { ctx := context.Background() params := &sqs.ReceiveMessageInput{ MaxNumberOfMessages: 10, QueueUrl: aws.String(consumer.QueueName), WaitTimeSeconds: 20, VisibilityTimeout: 30, MessageAttributeNames: []string{ string(types.QueueAttributeNameAll), }, } msgCh := make(chan types.Message) var wg sync.WaitGroup // Start worker pool first startPool(ctx, msgCh, &wg, consumer, serviceFunc) // Then start receiving messages // ... rest of the implementation }
Lassen Sie uns die entscheidenden SQS-Konfigurationsparameter untersuchen:
Im Worker-Pool kommt das Fan-out-Muster ins Spiel:
func startPool[requestBody any]( ctx context.Context, msgCh chan types.Message, wg *sync.WaitGroup, consumer *Consumer, serviceFunc func(c context.Context, dto *requestBody) error) { processingMessages := &sync.Map{} // Start 10 workers for i := 0; i < 10; i++ { go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc) } }
Wir verwenden eine sync.Map, um die Verarbeitung doppelter Nachrichten zu verhindern:
type Consumer struct { Client *sqs.Client QueueName string }
Das Fan-Out/Fan-In-Muster bietet eine elegante Lösung für die Verarbeitung großer SQS-Nachrichten in Go. Durch die Aufrechterhaltung eines festen Worker-Pools vermeiden wir die Fallstricke einer unbegrenzten Goroutine-Erstellung und stellen gleichzeitig eine effiziente Nachrichtenverarbeitung sicher.
Denken Sie daran, bei der Implementierung solcher Muster immer Ihren spezifischen Anwendungsfall zu berücksichtigen. Die hier angezeigten Konfigurationswerte (Worker-Anzahl, Timeout-Werte usw.) sollten basierend auf Ihren Anforderungen und Ressourcenbeschränkungen angepasst werden.
Quellcode: [Link zu Ihrem Repository, falls verfügbar]
Tags: #golang #aws #sqs #concurrency #distributed-systems
Das obige ist der detaillierte Inhalt vonErstellen eines skalierbaren SQS-Verbrauchers in Go. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!