분산 시스템을 구축할 때 Amazon SQS와 같은 메시지 대기열은 비동기식 워크로드를 처리하는 데 중요한 역할을 합니다. 이 게시물에서는 Keycloak에 대한 사용자 등록 이벤트를 처리하는 강력한 SQS 소비자를 Go에서 구현한 경험을 공유하겠습니다. 이 솔루션은 팬아웃/팬인 동시성 패턴을 사용하여 시스템 리소스를 과도하게 사용하지 않고도 효율적으로 메시지를 처리합니다.
저는 흥미로운 문제에 직면했습니다. Keycloak에 사용자를 등록하기 위해 매일 약 50,000개의 SQS 이벤트를 처리합니다. 순진한 접근 방식은 각 메시지에 대해 새로운 고루틴을 생성할 수 있지만 이로 인해 리소스가 빠르게 고갈될 수 있습니다. 동시성에 대해 보다 통제된 접근 방식이 필요했습니다.
팬아웃/팬인 패턴은 다음과 같은 이유로 이 사용 사례에 적합합니다.
먼저 기본 소비자 구조를 살펴보겠습니다.
type Consumer struct { Client *sqs.Client QueueName string }
구현은 세 가지 주요 구성요소로 구성됩니다.
소비자를 시작하는 방법은 다음과 같습니다.
func StartPool[requestBody any]( serviceFunc func(c context.Context, dto *requestBody) error, consumer *Consumer) { ctx := context.Background() params := &sqs.ReceiveMessageInput{ MaxNumberOfMessages: 10, QueueUrl: aws.String(consumer.QueueName), WaitTimeSeconds: 20, VisibilityTimeout: 30, MessageAttributeNames: []string{ string(types.QueueAttributeNameAll), }, } msgCh := make(chan types.Message) var wg sync.WaitGroup // Start worker pool first startPool(ctx, msgCh, &wg, consumer, serviceFunc) // Then start receiving messages // ... rest of the implementation }
중요한 SQS 구성 매개변수를 살펴보겠습니다.
작업자 풀은 팬아웃 패턴이 작용하는 곳입니다.
func startPool[requestBody any]( ctx context.Context, msgCh chan types.Message, wg *sync.WaitGroup, consumer *Consumer, serviceFunc func(c context.Context, dto *requestBody) error) { processingMessages := &sync.Map{} // Start 10 workers for i := 0; i < 10; i++ { go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc) } }
중복 메시지 처리를 방지하기 위해 sync.Map을 사용합니다.
type Consumer struct { Client *sqs.Client QueueName string }
팬아웃/팬인 패턴은 Go에서 대용량 SQS 메시지를 처리하기 위한 우아한 솔루션을 제공합니다. 고정된 작업자 풀을 유지함으로써 효율적인 메시지 처리를 보장하는 동시에 무제한 고루틴 생성의 함정을 피합니다.
이러한 패턴을 구현할 때는 항상 구체적인 사용 사례를 고려해야 한다는 점을 기억하세요. 여기에 표시된 구성 값(작업자 수, 제한 시간 값 등)은 요구 사항 및 리소스 제약 조건에 따라 조정되어야 합니다.
소스 코드: [사용 가능한 경우 저장소 링크]
태그: #golang #aws #sqs #concurrency #분산 시스템
위 내용은 Go에서 확장 가능한 SQS 소비자 구축의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!