分散システムを構築する場合、Amazon SQS などのメッセージキューは非同期ワークロードの処理において重要な役割を果たします。この投稿では、Keycloak のユーザー登録イベントを処理する堅牢な SQS コンシューマーを Go に実装した私の経験を共有します。このソリューションは、ファンアウト/ファンイン同時実行パターンを使用して、システム リソースを圧迫することなくメッセージを効率的に処理します。
私は興味深い問題に直面しました。それは、Keycloak にユーザーを登録するために毎日約 50,000 の SQS イベントを処理することです。単純なアプローチでは、メッセージごとに新しい goroutine が生成される可能性がありますが、これはすぐにリソースの枯渇につながる可能性があります。同時実行に対して、より制御されたアプローチが必要でした。
ファンアウト/ファンイン パターンは、次の理由からこのユースケースに最適です。
まず、基本的な消費者の構造を見てみましょう:
type Consumer struct { Client *sqs.Client QueueName string }
実装は 3 つの主要コンポーネントで構成されます:
コンシューマを開始する方法は次のとおりです:
func StartPool[requestBody any]( serviceFunc func(c context.Context, dto *requestBody) error, consumer *Consumer) { ctx := context.Background() params := &sqs.ReceiveMessageInput{ MaxNumberOfMessages: 10, QueueUrl: aws.String(consumer.QueueName), WaitTimeSeconds: 20, VisibilityTimeout: 30, MessageAttributeNames: []string{ string(types.QueueAttributeNameAll), }, } msgCh := make(chan types.Message) var wg sync.WaitGroup // Start worker pool first startPool(ctx, msgCh, &wg, consumer, serviceFunc) // Then start receiving messages // ... rest of the implementation }
重要な SQS 構成パラメータを調べてみましょう:
ワーカー プールは、ファンアウト パターンが作用する場所です。
func startPool[requestBody any]( ctx context.Context, msgCh chan types.Message, wg *sync.WaitGroup, consumer *Consumer, serviceFunc func(c context.Context, dto *requestBody) error) { processingMessages := &sync.Map{} // Start 10 workers for i := 0; i < 10; i++ { go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc) } }
重複メッセージの処理を防ぐために sync.Map を使用します。
type Consumer struct { Client *sqs.Client QueueName string }
ファンアウト/ファンイン パターンは、Go で大量の SQS メッセージを処理するための洗練されたソリューションを提供します。固定ワーカー プールを維持することで、効率的なメッセージ処理を確保しながら、無制限の goroutine 作成の落とし穴を回避します。
このようなパターンを実装するときは、常に特定のユースケースを考慮することを忘れないでください。ここに表示される構成値 (ワーカー数、タイムアウト値など) は、要件とリソースの制約に基づいて調整する必要があります。
ソースコード: [利用可能な場合はリポジトリへのリンク]
タグ: #golang #aws #sqs #concurrency #distributed-systems
以上がGo でスケーラブルな SQS コンシューマを構築するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。