How to Implement Batch Message Consumption with RocketMQ in Spring Boot

1. Adding Dependencies
First, add the necessary dependencies to your pom.xml file:
<!-- RocketMQ Spring Boot dependency for Spring Boot 3 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Dependency compatible with MQ cluster version 5.3.0 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
2. Configuration File bootstrap.yaml
Configure your RocketMQ settings in the bootstrap.yaml file:
rocketmq:
name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
consumer:
group: consume-group-test
access-key: access # Configure if ACL is used
secret-key: secret
consume-message-batch-max-size: 50 # Max messages per batch
pull-batch-size: 100 # Max messages pulled from Broker
topics:
project: "group-topic-1"
groups:
project: "consume-group-1" # Use different groups for different business processes
3. Configuration Class MqConfigProperties
Create the configuration class MqConfigProperties:
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
import java.io.Serializable;
/**
* RocketMQ Configuration Class
*/
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {
private static final long serialVersionUID = 1L;
@Autowired
private RocketMQProperties rocketMQProperties;
private TopicProperties topics;
private GroupProperties groups;
/**
* Topic Configuration Class
*/
@Data
public static class TopicProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
/**
* Consumer Group Configuration Class
*/
@Data
public static class GroupProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
}
4. Implementing the Consumer Code
Create the consumer class UserConsumer:
import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.List;
/**
* Batch Consumer Implementation
*/
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {
@Resource
private MqConfigProperties mqConfigProperties;
@Resource
private ApplicationContext applicationContext;
private volatile boolean running;
private DefaultMQPushConsumer consumer;
@Override
public void start() {
if (isRunning()) {
throw new IllegalStateException("Consumer is already running");
}
initConsumer();
setRunning(true);
log.info("UserConsumer started successfully.");
}
@Override
public void stop() {
if (isRunning() && consumer != null) {
consumer.shutdown();
setRunning(false);
log.info("UserConsumer stopped.");
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
private void initConsumer() {
String topic = mqConfigProperties.getTopics().getProject();
String group = mqConfigProperties.getGroups().getProject();
String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
consumer = rpcHook != null
? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
: new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(100); // Set the batch size for consumption
consumer.subscribe(topic, "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
log.info("Received {} messages", msgs.size());
for (MessageExt message : msgs) {
String body = new String(message.getBody());
log.info("Processing message: {}", body);
User user = JSONObject.parseObject(body, User.class);
processUser(user);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
}
private void processUser(User user) {
log.info("Processing user with ID: {}", user.getId());
// Handle user-related business logic
}
}
5. Producer Example Code
To produce batch messages, you can use the following UserProducer class:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class UserProducer {
private DefaultMQProducer producer;
public void sendBatchMessages(List<User> users, String topic) {
List<Message> messages = new ArrayList<>();
for (User user : users) {
messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
}
try {
producer.send(messages);
} catch (Exception e) {
log.error("Error sending batch messages", e);
}
}
}
6. Additional Optimization Suggestions
Performance Optimization: You can adjust the size of the consumer thread pool. By default, it's set to consumeThreadMin=20 and consumeThreadMax=20. In high-concurrency scenarios, increasing the thread pool size can enhance performance.
Error Handling: When consumption fails, be cautious with RECONSUME_LATER to avoid infinite retry loops. Set a maximum retry count based on your business requirements.
Tenant Isolation: Use different groups for different business modules to avoid consuming data from the wrong group. This is especially crucial in production environments.
The above is the detailed content of How to Implement Batch Message Consumption with RocketMQ in Spring Boot. For more information, please follow other related articles on the PHP Chinese website!
Hot AI Tools
Undresser.AI Undress
AI-powered app for creating realistic nude photos
AI Clothes Remover
Online AI tool for removing clothes from photos.
Undress AI Tool
Undress images for free
Clothoff.io
AI clothes remover
AI Hentai Generator
Generate AI Hentai for free.
Hot Article
Hot Tools
Notepad++7.3.1
Easy-to-use and free code editor
SublimeText3 Chinese version
Chinese version, very easy to use
Zend Studio 13.0.1
Powerful PHP integrated development environment
Dreamweaver CS6
Visual web development tools
SublimeText3 Mac version
God-level code editing software (SublimeText3)
Hot Topics
1378
52
How does Java's classloading mechanism work, including different classloaders and their delegation models?
Mar 17, 2025 pm 05:35 PM
Java's classloading involves loading, linking, and initializing classes using a hierarchical system with Bootstrap, Extension, and Application classloaders. The parent delegation model ensures core classes are loaded first, affecting custom class loa
How do I implement multi-level caching in Java applications using libraries like Caffeine or Guava Cache?
Mar 17, 2025 pm 05:44 PM
The article discusses implementing multi-level caching in Java using Caffeine and Guava Cache to enhance application performance. It covers setup, integration, and performance benefits, along with configuration and eviction policy management best pra
How can I use JPA (Java Persistence API) for object-relational mapping with advanced features like caching and lazy loading?
Mar 17, 2025 pm 05:43 PM
The article discusses using JPA for object-relational mapping with advanced features like caching and lazy loading. It covers setup, entity mapping, and best practices for optimizing performance while highlighting potential pitfalls.[159 characters]
How do I use Maven or Gradle for advanced Java project management, build automation, and dependency resolution?
Mar 17, 2025 pm 05:46 PM
The article discusses using Maven and Gradle for Java project management, build automation, and dependency resolution, comparing their approaches and optimization strategies.
How do I create and use custom Java libraries (JAR files) with proper versioning and dependency management?
Mar 17, 2025 pm 05:45 PM
The article discusses creating and using custom Java libraries (JAR files) with proper versioning and dependency management, using tools like Maven and Gradle.


