Home > Java > javaTutorial > How does RocketMQ implement message sending and receiving in Springboot?

How does RocketMQ implement message sending and receiving in Springboot?

WBOY
Release: 2023-05-18 17:19:06
forward
1750 people have browsed it

springboot rockermq implements simple message sending and receiving

There are three ways to send ordinary messages: one-way sending, synchronous sending and asynchronous sending.

Let’s introduce springboot rockermq integration to realize the sending and receiving of ordinary messages

  • Create a Springboot project and add rockermq dependency

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
Copy after login
  • Configure rocketmq

# Port
server:
port: 8083

# Configure rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#producer
producer:
#Producer group name, it must be unique in an application
group: group1
#The default timeout for message sending is 3000ms
send-message-timeout: 3000
#When the message reaches 4096 bytes, the message will be compressed. Default 4096
compress-message-body-threshold: 4096
#Maximum message limit, default is 128K
max-message-size: 4194304
#Number of retries for failed synchronization message sending
retry-times-when-send-failed: 3
#Whether to retry other agents when internal sending fails, this parameter will only take effect when there are multiple brokers
retry-next-server: true
# Number of retries for failed asynchronous message sending
retry-times-when-send-async-failed: 3

  • Create a new controller to send messages:

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 普通信息的三种方式:同步、异步、单向
 * @author qzz
 */
@RestController
public class RocketMQCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送普通消息
     * convertAndSend(String destination, Object payload) 发送字符串比较方便
     */
    @RequestMapping("/send")
    public void send(){
        rocketMQTemplate.convertAndSend("test-topic","test-message");
    }
    /**
     * 发送同步消息
     */
    @RequestMapping("/testSyncSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试");
        System.out.println(sendResult);
    }
    /**
     * 发送异步消息
     */
    @RequestMapping("/testASyncSend")
    public void testASyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        //参数三:回调
        rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送异常");
                throwable.printStackTrace();
            }
        });
    }
    /**
     * 发送单向消息
     */
    @RequestMapping("/testOneWay")
    public void testOneWay(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        rocketMQTemplate.sendOneWay("test-topic","单向消息测试");
    }
}
Copy after login

SpringBoot provides us with the RocketMQTemplate template class, which we can use to send messages in various forms.

The sending method specifies the Topic topic test-topic.

  • Create a new message consumer to listen to RocketMQConsumerListener, listen to messages, and consume messages

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 消费消息
 * 配置RocketMQ监听
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test",topic = "test-topic")
public class RocketMQConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消费消息:"+s);
    }
}
Copy after login

The consumer class must implement the RocketMQListener interface , and dynamically specify the message type String.

The @RocketMQMessageListener annotation should be added to the class, specify the topic topic test-topic, and the consumer group test

Simple message sending and receiving is completed!

  • Start the service and test message consumption

How does RocketMQ implement message sending and receiving in Springboot?

How does RocketMQ implement message sending and receiving in Springboot?

Test synchronization Message:

How does RocketMQ implement message sending and receiving in Springboot?

Test asynchronous message:

How does RocketMQ implement message sending and receiving in Springboot?

## Test one-way message:

How does RocketMQ implement message sending and receiving in Springboot?

The above is the detailed content of How does RocketMQ implement message sending and receiving in Springboot?. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template