在建構分散式系統時,像 Amazon SQS 這樣的訊息佇列在處理非同步工作負載方面發揮著至關重要的作用。在這篇文章中,我將分享我在 Go 中實現強大的 SQS 消費者的經驗,該消費者可以處理 Keycloak 的用戶註冊事件。此解決方案使用扇出/扇入並發模式來有效處理訊息,而不會佔用系統資源。
我遇到了一個有趣的問題:每天處理大約 50,000 個 SQS 事件以在 Keycloak 中註冊用戶。一種幼稚的方法可能會為每個訊息產生一個新的 goroutine,但這可能很快就會導致資源耗盡。我們需要一種更受控制的並發方法。
扇出/扇入模式非常適合此用例,因為它:
首先我們來看看我們的基本消費結構:
type Consumer struct { Client *sqs.Client QueueName string }
實現由三個主要組件組成:
這是我們啟動消費者的方式:
func StartPool[requestBody any]( serviceFunc func(c context.Context, dto *requestBody) error, consumer *Consumer) { ctx := context.Background() params := &sqs.ReceiveMessageInput{ MaxNumberOfMessages: 10, QueueUrl: aws.String(consumer.QueueName), WaitTimeSeconds: 20, VisibilityTimeout: 30, MessageAttributeNames: []string{ string(types.QueueAttributeNameAll), }, } msgCh := make(chan types.Message) var wg sync.WaitGroup // Start worker pool first startPool(ctx, msgCh, &wg, consumer, serviceFunc) // Then start receiving messages // ... rest of the implementation }
讓我們檢查一下關鍵的 SQS 設定參數:
工作池是扇出模式發揮作用的地方:
func startPool[requestBody any]( ctx context.Context, msgCh chan types.Message, wg *sync.WaitGroup, consumer *Consumer, serviceFunc func(c context.Context, dto *requestBody) error) { processingMessages := &sync.Map{} // Start 10 workers for i := 0; i < 10; i++ { go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc) } }
我們使用sync.Map來防止處理重複訊息:
type Consumer struct { Client *sqs.Client QueueName string }
扇出/扇入模式為在 Go 中處理大量 SQS 訊息提供了一個優雅的解決方案。透過維護固定的工作池,我們可以避免無限制的 goroutine 創建的陷阱,同時確保高效的訊息處理。
請記住在實現此類模式時始終考慮您的特定用例。此處顯示的配置值(工作執行緒數、逾時值等)應根據您的要求和資源限制進行調整。
原始碼:[連結到您的儲存庫(如果有)]
標籤:#golang #aws #sqs #concurrency #distributed-systems
以上是用 Go 建構可擴展的 SQS 消費者的詳細內容。更多資訊請關注PHP中文網其他相關文章!