Home > Backend Development > Golang > Building a Scalable SQS Consumer in Go

Building a Scalable SQS Consumer in Go

Barbara Streisand
Release: 2024-12-11 12:39:09
Original
909 people have browsed it

Building a Scalable SQS Consumer in Go

Introduction

When building distributed systems, message queues like Amazon SQS play a crucial role in handling asynchronous workloads. In this post, I'll share my experience implementing a robust SQS consumer in Go that handles user registration events for Keycloak. The solution uses the fan-out/fan-in concurrency pattern to process messages efficiently without overwhelming system resources.

The Challenge

I faced an interesting problem: process around 50,000 SQS events daily to register users in Keycloak. A naive approach might spawn a new goroutine for each message, but this could quickly lead to resource exhaustion. We needed a more controlled approach to concurrency.

Why Fan-out/Fan-in?

The fan-out/fan-in pattern is perfect for this use case because it:

  • Maintains a fixed pool of worker goroutines
  • Distributes work evenly across workers
  • Prevents resource exhaustion
  • Provides better control over concurrent operations

Implementation Deep Dive

1. The Consumer Structure

First, let's look at our basic consumer structure:

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

Copy after login
Copy after login

2. Message Processing Pipeline

The implementation consists of three main components:

  1. Message Receiver: Continuously polls SQS for new messages
  2. Worker Pool: Fixed number of goroutines processing messages
  3. Message Channel: Connects the receiver to workers

Here's how we start the consumer:

func StartPool[requestBody any](
    serviceFunc func(c context.Context, dto *requestBody) error,
    consumer *Consumer) {

    ctx := context.Background()
    params := &sqs.ReceiveMessageInput{
        MaxNumberOfMessages: 10,
        QueueUrl:           aws.String(consumer.QueueName),
        WaitTimeSeconds:    20,
        VisibilityTimeout:  30,
        MessageAttributeNames: []string{
            string(types.QueueAttributeNameAll),
        },
    }

    msgCh := make(chan types.Message)
    var wg sync.WaitGroup

    // Start worker pool first
    startPool(ctx, msgCh, &wg, consumer, serviceFunc)

    // Then start receiving messages
    // ... rest of the implementation
}

Copy after login

3. Key Configuration Parameters

Let's examine the crucial SQS configuration parameters:

  • MaxNumberOfMessages (10): Batch size for each poll
  • WaitTimeSeconds (20): Long polling duration
  • VisibilityTimeout (30): Grace period for message processing

4. Worker Pool Implementation

The worker pool is where the fan-out pattern comes into play:

func startPool[requestBody any](
    ctx context.Context,
    msgCh chan types.Message,
    wg *sync.WaitGroup,
    consumer *Consumer,
    serviceFunc func(c context.Context, dto *requestBody) error) {

    processingMessages := &sync.Map{}

    // Start 10 workers
    for i := 0; i < 10; i++ {
        go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc)
    }
}

Copy after login

5. Duplicate Message Handling

We use a sync.Map to prevent processing duplicate messages:

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

Copy after login
Copy after login

Best Practices and Learnings

  1. Error Handling: Always handle errors gracefully and log them appropriately
  2. Message Cleanup: Delete messages only after successful processing
  3. Graceful Shutdown: Implement proper shutdown mechanisms using context
  4. Monitoring: Add logging at key points for observability

Performance Considerations

  • Worker Count: Choose based on your workload and available resources
  • Batch Size: Balance between throughput and processing time
  • Visibility Timeout: Set according to your average processing time

Future Improvements

  1. Dynamic Worker Scaling: Adjust worker count based on queue depth
  2. Circuit Breaker: Add circuit breaker for downstream services
  3. Metrics Collection: Add Prometheus metrics for monitoring
  4. Dead Letter Queue: Implement DLQ handling for failed messages
  5. Retries: Add exponential backoff for transient failures

Conclusion

The fan-out/fan-in pattern provides an elegant solution for processing high-volume SQS messages in Go. By maintaining a fixed worker pool, we avoid the pitfalls of unbounded goroutine creation while ensuring efficient message processing.

Remember to always consider your specific use case when implementing such patterns. The configuration values shown here (worker count, timeout values, etc.) should be adjusted based on your requirements and resource constraints.


Source code: [Link to your repository if available]

Tags: #golang #aws #sqs #concurrency #distributed-systems

The above is the detailed content of Building a Scalable SQS Consumer in Go. For more information, please follow other related articles on the PHP Chinese website!

source:dev.to
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Articles by Author
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template