Java
javaLernprogramm
So implementieren Sie den Batch-Nachrichtenverbrauch mit RocketMQ in Spring Boot
So implementieren Sie den Batch-Nachrichtenverbrauch mit RocketMQ in Spring Boot

1. Abhängigkeiten hinzufügen
Fügen Sie zunächst die erforderlichen Abhängigkeiten zu Ihrer pom.xml-Datei hinzu:
<!-- RocketMQ Spring Boot dependency for Spring Boot 3 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Dependency compatible with MQ cluster version 5.3.0 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
2. Konfigurationsdatei bootstrap.yaml
Konfigurieren Sie Ihre RocketMQ-Einstellungen in der Datei „bootstrap.yaml“:
rocketmq:
name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
consumer:
group: consume-group-test
access-key: access # Configure if ACL is used
secret-key: secret
consume-message-batch-max-size: 50 # Max messages per batch
pull-batch-size: 100 # Max messages pulled from Broker
topics:
project: "group-topic-1"
groups:
project: "consume-group-1" # Use different groups for different business processes
3. Konfigurationsklasse MqConfigProperties
Erstellen Sie die Konfigurationsklasse MqConfigProperties:
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
import java.io.Serializable;
/**
* RocketMQ Configuration Class
*/
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {
private static final long serialVersionUID = 1L;
@Autowired
private RocketMQProperties rocketMQProperties;
private TopicProperties topics;
private GroupProperties groups;
/**
* Topic Configuration Class
*/
@Data
public static class TopicProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
/**
* Consumer Group Configuration Class
*/
@Data
public static class GroupProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
}
4. Umsetzung des Verbraucherschutzgesetzes
Erstellen Sie die Verbraucherklasse UserConsumer:
import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.List;
/**
* Batch Consumer Implementation
*/
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {
@Resource
private MqConfigProperties mqConfigProperties;
@Resource
private ApplicationContext applicationContext;
private volatile boolean running;
private DefaultMQPushConsumer consumer;
@Override
public void start() {
if (isRunning()) {
throw new IllegalStateException("Consumer is already running");
}
initConsumer();
setRunning(true);
log.info("UserConsumer started successfully.");
}
@Override
public void stop() {
if (isRunning() && consumer != null) {
consumer.shutdown();
setRunning(false);
log.info("UserConsumer stopped.");
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
private void initConsumer() {
String topic = mqConfigProperties.getTopics().getProject();
String group = mqConfigProperties.getGroups().getProject();
String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
consumer = rpcHook != null
? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
: new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(100); // Set the batch size for consumption
consumer.subscribe(topic, "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
log.info("Received {} messages", msgs.size());
for (MessageExt message : msgs) {
String body = new String(message.getBody());
log.info("Processing message: {}", body);
User user = JSONObject.parseObject(body, User.class);
processUser(user);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
}
private void processUser(User user) {
log.info("Processing user with ID: {}", user.getId());
// Handle user-related business logic
}
}
5. Beispielcode des Herstellers
Um Batch-Nachrichten zu erstellen, können Sie die folgende UserProducer-Klasse verwenden:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class UserProducer {
private DefaultMQProducer producer;
public void sendBatchMessages(List<User> users, String topic) {
List<Message> messages = new ArrayList<>();
for (User user : users) {
messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
}
try {
producer.send(messages);
} catch (Exception e) {
log.error("Error sending batch messages", e);
}
}
}
6. Zusätzliche Optimierungsvorschläge
Leistungsoptimierung: Sie können die Größe des Consumer-Thread-Pools anpassen. Standardmäßig ist es auf „consumerThreadMin=20“ und „consumemThreadMax=20“ eingestellt. In Szenarien mit hoher Parallelität kann die Erhöhung der Thread-Poolgröße die Leistung verbessern.
Fehlerbehandlung: Wenn der Verbrauch fehlschlägt, seien Sie bei RECONSUME_LATER vorsichtig, um endlose Wiederholungsschleifen zu vermeiden. Legen Sie eine maximale Wiederholungsanzahl basierend auf Ihren Geschäftsanforderungen fest.
Tenant Isolation: Verwenden Sie unterschiedliche Gruppen für unterschiedliche Geschäftsmodule, um zu vermeiden, dass Daten aus der falschen Gruppe verbraucht werden. Dies ist besonders in Produktionsumgebungen von entscheidender Bedeutung.
Das obige ist der detaillierte Inhalt vonSo implementieren Sie den Batch-Nachrichtenverbrauch mit RocketMQ in Spring Boot. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!
Heiße KI -Werkzeuge
Undress AI Tool
Ausziehbilder kostenlos
Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos
AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.
Clothoff.io
KI-Kleiderentferner
Video Face Swap
Tauschen Sie Gesichter in jedem Video mühelos mit unserem völlig kostenlosen KI-Gesichtstausch-Tool aus!
Heißer Artikel
Heiße Werkzeuge
Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor
SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen
Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung
Dreamweaver CS6
Visuelle Webentwicklungstools
SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)
Wie gehe ich mit Transaktionen in Java mit JDBC um?
Aug 02, 2025 pm 12:29 PM
Um JDBC -Transaktionen korrekt zu verarbeiten, müssen Sie zunächst den automatischen Komiti -Modus ausschalten und dann mehrere Vorgänge ausführen und schließlich entsprechend den Ergebnissen festlegen oder rollen. 1. Nennen Sie Conn.SetAutoCommit (False), um die Transaktion zu starten. 2. Führen Sie mehrere SQL -Operationen aus, z. B. einfügen und aktualisieren. 3. Rufen Sie Conn.Commit () an, wenn alle Vorgänge erfolgreich sind, und rufen Sie Conn.Rollback () auf, wenn eine Ausnahme auftritt, um die Datenkonsistenz zu gewährleisten. Gleichzeitig sollten Try-with-Ressourcen verwendet werden, um Ressourcen zu verwalten, Ausnahmen ordnungsgemäß zu behandeln und Verbindungen zu schließen, um Verbindungsleckage zu vermeiden. Darüber hinaus wird empfohlen, Verbindungspools zu verwenden und Save -Punkte zu setzen, um teilweise Rollback zu erreichen und Transaktionen so kurz wie möglich zu halten, um die Leistung zu verbessern.
Aufbau erholsamer APIs in Java mit Jakarta EE
Jul 30, 2025 am 03:05 AM
Setupamaven/GradleProjectWithjax-rsdependencies-ähnlich Jersey; 2. CreatearestresourcEUntationSuchas@pathand@Get; 3.ConfiguretheApplicationviaApplicationSubclitsorweb.xml;
Wie arbeite man mit Kalender in Java?
Aug 02, 2025 am 02:38 AM
Verwenden Sie Klassen im Java.Time -Paket, um das alte Datum und die Kalenderklassen zu ersetzen. 2. Erhalten Sie das aktuelle Datum und die aktuelle Uhrzeit durch LocalDate, LocalDatetime und Local Time; 3. Erstellen Sie ein bestimmtes Datum und eine bestimmte Uhrzeit mit der von () Methode; 4.. Verwenden Sie die Plus/Minus -Methode, um die Zeit nicht zu erhöhen und zu verkürzen. 5. Verwenden Sie ZonedDatetime und zoneId, um die Zeitzone zu verarbeiten. 6. Format und analysieren Sie Datumszeichenfolgen über DateTimeFormatter; 7. Verwenden Sie sofortige, um bei Bedarf mit den alten Datumstypen kompatibel zu sein. Die Verarbeitung der Datum in der modernen Java sollte der Verwendung von Java.Timeapi vorrangig machen, was klare, unveränderliche und linear ist
Beherrschen der Abhängigkeitsinjektion in Java mit Frühling und Guice
Aug 01, 2025 am 05:53 AM
Abhängigkeitsinjektion (DI) IsAdeSnIntWhereObjectSRecedEpendencieSexternally, PromotingLoosecoubingAneAtReTingThroughConstructor, Setter, Orfieldinjection.2springFrameworkuSaSNotations-@-Komponenten,@Service und@autowiredWithjava-basierte Konfitation
Vergleich von Java Frameworks: Spring Boot vs Quarkus gegen Micronaut
Aug 04, 2025 pm 12:48 PM
Pre-Formancetartuptimemoryusage, QuarkusandmicronautleadduToCompile-Time-foringandgraalvSupport, WithQuarkusofttenperformLightBetterin serverloser Szenarien.2. Thyvelopecosystem,
Java -Leistungsoptimierung und Profilerstellungstechniken
Jul 31, 2025 am 03:58 AM
Verwenden Sie Leistungsanalyse-Tools, um Engpässe zu lokalisieren, VisualVM oder JProfiler in der Entwicklung und Testphase zu verwenden und Async-Profiler in der Produktionsumgebung Priorität zu geben. 2. Reduzieren Sie die Objekterstellung, verwenden Sie Objekte wieder, verwenden Sie StringBuilder, um String -Spleißen zu ersetzen und entsprechende GC -Strategien auszuwählen. 3.. Optimieren Sie die Auswahl der Sammlung, wählen Sie die Anfangskapazität gemäß der Szene aus; V. 5. Tune JVM-Parameter, festlegen, angemessene Haufengröße und Müllsammler mit geringer Latenz einstellen und GC-Protokolle aktivieren; 6. Vermeiden Sie die Reflexion auf Codeebene, ersetzen Sie Wrapper -Klassen durch Grundtypen, Verzögerungsinitialisierung und verwenden Sie endgültige und statische. 7. Kontinuierliche Leistungstest und Überwachung, kombiniert mit JMH
Ein Entwicklerleitfaden für Java Project Management zu Maven für Java
Jul 30, 2025 am 02:41 AM
Maven ist ein Standardwerkzeug für Java -Projektmanagement und -aufbau. Die Antwort liegt in der Tatsache, dass Pom.xml verwendet wird, um Projektstruktur, Abhängigkeitsmanagement, Konstruktionslebenszyklusautomation und Plug-in-Erweiterungen zu standardisieren. 1. Verwenden Sie POM.xml, um Gruppen, Artefaktid, Version und Abhängigkeiten zu definieren; 2. Master -Kernbefehle wie MVNClean, Compile, Test, Paket, Installation und Bereitstellen; Fn. V. 5.
Verständnis der Java Virtual Machine (JVM) Interna
Aug 01, 2025 am 06:31 AM
ThejvMenablesJavas "Writeonce, Runanywhere" -CapabilityByexecutingByteCodethroughfourMainComponents: 1.TheClassloadersubStemLoads, Links, undinitializes


