Maison > Java > javaDidacticiel > Comment Springboot intègre mqtt

Comment Springboot intègre mqtt

PHPz
Libérer: 2023-05-15 16:25:13
avant
1862 Les gens l'ont consulté

springboot intègre mqtt

Si vous utilisez un cluster, n'oubliez pas d'ouvrir les ports suivants lors de la configuration :

Comment Springboot intègre mqtt

D'accord, l'étape suivante consiste à connecter notre programme Java à mqtt. en fait, il y en a plus de deux) pour se connecter.

La première consiste à utiliser directement la bibliothèque client Java MQTT

La seconde consiste à utiliser spring Integration mqtt qui est également plus recommandé, et ce c'est ce sur quoi nous nous concentrons. spring integration mqtt也是比较推荐的一种,也是我们主讲这种.

第一步 添加 maven dependency

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>
Copier après la connexion

第二步 添加配置

1 先写好一些基本配置

mqtt:
 username: test                        # 账号
 password: 123456                      # 密码
 host-url: tcp://127.0.0.1:1883        # mqtt连接tcp地址
 in-client-id: ${random.value}         # 随机值,使出入站 client ID 不同
 out-client-id: ${random.value}
 client-id: ${random.int}                   # 客户端Id,不能相同,采用随机数 ${random.value}
 default-topic: test/#,topic/+/+/up         # 默认主题
 timeout: 60                                # 超时时间
 keepalive: 60                              # 保持连接
 clearSession: true                         # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
Copier après la connexion

2.然后写一个对应的类MqttProperties

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MqttProperties 
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Component
public class MqttProperties {

    /**
     * 用户名
     */
    @Value("${mqtt.username}")
    private String username;

    /**
     * 密码
     */
    @Value("${mqtt.password}")
    private String password;

    /**
     * 连接地址
     */
    @Value("${mqtt.host-url}")
    private String hostUrl;

    /**
     * 进-客户Id
     */
    @Value("${mqtt.in-client-id}")
    private String inClientId;

    /**
     * 出-客户Id
     */
    @Value("${mqtt.out-client-id}")
    private String outClientId;

    /**
     * 客户Id
     */
    @Value("${mqtt.client-id}")
    private String clientId;

    /**
     * 默认连接话题
     */
    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    /**
     * 超时时间
     */
    @Value("${mqtt.timeout}")
    private int timeout;

    /**
     * 保持连接数
     */
    @Value("${mqtt.keepalive}")
    private int keepalive;

    /**是否清除session*/
    @Value("${mqtt.clearSession}")
    private boolean clearSession;

	// ...getter and setter

}
Copier après la connexion

接下来就是配置一些乱七八糟的东西, 这里有很多概念性的东西 比如 管道channel, 适配器 adapter, 入站Inbound, 出站Outbound,等等等等, 看起来是非常头痛的

好吧,那就一个一个来,

首先连接mqtt需要一个客户端, 那么我们就开一个客户端工厂, 这里可以产生很多很多的客户端

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
Copier après la connexion

然后再搞两根管子(channel),一个出站,一个入站

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
Copier après la connexion

为了使这些管子能流通 就需要一个适配器(adapter)

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }
Copier après la connexion

然后定义消息生产者

    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }
Copier après la connexion

那我们收到消息去哪里处理呢,答案是这里:

    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
    	// 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面)
        return mqttMessageHandle;
		// 你也可以这样写
//        return new MessageHandler() {
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                // do something
//            }
//        };
Copier après la connexion

到这里我们其实已经可以接受到来自mqtt的消息了

接下来配置向mqtt发送消息

配置 出站处理器

    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }
Copier après la connexion

这个 出站处理器 在我看来就是让别人 (MqttPahoMessageHandler)处理了, 我就不处理了,我只管我要发送什么,至于怎么发送,由MqttPahoMessageHandler来完成

接下来我们定义一个接口即可

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MqttGateway
 *
 * @author hengzi
 * @date 2022/8/23
 */

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}
Copier après la connexion

我们直接调用这个接口就可以向mqtt 发送数据

到目前为止,整个配置文件长这样:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * MqttConfig
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Configuration
public class MqttConfig {


    /**
     *  以下属性将在配置文件中读取
     **/
    @Autowired
    private MqttProperties mqttProperties;


    //Mqtt 客户端工厂
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }


    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }


    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }

    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return mqttMessageHandle;
    }

    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }


    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
}
Copier après la connexion

处理消息的 MqttMessageHandle

@Component
public class MqttMessageHandle implements MessageHandler {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
     
    }
}
Copier après la connexion

在进一步了解之后,发现可以优化的地方,比如channel 的类型是有很多种的, 这里使用的DirectChannel,是Spring Integration默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的.

这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channel

@Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        // 用线程池
        return new ExecutorChannel(mqttThreadPoolTaskExecutor());
    }
Copier après la connexion

到这里其实可以运行了.

但是这样配置其实还是有点多, 有点乱, 于是我查找官网, f发现一种更简单的配置方法 叫 Java DSL

我们参考官网,稍微改一下,使用 DSL的方式进行配置:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * MqttConfigV2
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Configuration
public class MqttConfigV2 {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private MqttMessageHandle mqttMessageHandle;


    //Mqtt 客户端工厂 所有客户端从这里产生
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }

    // 消息生产者 (接收,处理来自mqtt的消息)
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return IntegrationFlows.from( adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle)
                .get();
    }

    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    // 出站处理器 (向 mqtt 发送消息)
    @Bean
    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {

        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
    }

}
Copier après la connexion

这样看起来真的简单多了, 头也没那么大了, 我要是早知道多好.

好了以上就是配置相关的, 到这里其实是已经完成springboot 与 mqtt 的整合了.

但其实我一直有个想法, 就是我们接收的消息 都是在 handleMessage这个方法里面执行的,

	@Override
    public void handleMessage(Message<?> message) throws MessagingException {
     			
    }
Copier après la connexion

所以我就有了一个想法, 能不能根据 我订阅的主题,在不同的方法执行, 对于这个问题,其实你用if ... else ...也能实现, 但很明显,如果我订阅的主题很多的话, 那写起来就很头痛了.

对于这个问题,有两种思路, 一个是添加Spring Integration的路由 router,根据不同topic路由到不同的channel, 这个我也知道能不能实现, 我这里就不讨论了.

第二种是, 我也不知道名字改如何叫, 我是参考了 spring@Controller的设计, 暂且叫他注解模式.

众所周知,我们的接口都是在类上加 @Controller这个注解, 就代表这个类是 http 接口, 再在方法加上 @RequestMapping就能实现不同的 url 调用不同的方法.

参数这个设计 我们在类上面加 @MqttService就代表这个类是专门处理mqtt消息的服务类
同时 在这个类的方法上 加上 @MqttTopic就代表 这个主题由这个方法处理.

OK, 理论有了,接下来就是 实践.

先定义 两个注解

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;

import java.lang.annotation.*;

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {

    @AliasFor(
            annotation = Component.class
    )
    String value() default "";
}
Copier après la connexion

加上 @Component注解 spring就会扫描, 并注册到IOC容器里

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {

    /**
     * 主题名字
     */
    String value() default "";

}
Copier après la connexion

参考 @RequestMapping我们使用起来应该是这样的:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

/**
 * MqttTopicHandle
 *
 * @author hengzi
 * @date 2022/8/24
 */
@MqttService
public class MqttTopicHandle {

    public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);

	// 这里的 # 号是通配符
    @MqttTopic("test/#")
    public void test(Message<?> message){
        log.info("test="+message.getPayload());
    }
	
	// 这里的 + 号是通配符
    @MqttTopic("topic/+/+/up")
    public void up(Message<?> message){
        log.info("up="+message.getPayload());
    }

	// 注意 你必须先订阅
    @MqttTopic("topic/1/2/down")
    public void down(Message<?> message){
        log.info("down="+message.getPayload());
    }
}
Copier après la connexion

OK 接下来就是实现这样的使用

分析 :

当我们收到消息时, 我们从IOC容器中 找到所有 带 @MqttService注解的类

然后 遍历这些类, 找到带有 @MqttTopic的方法

接着 把 @MqttTopic注解的的值 与 接受到的topic 进行对比

如果一致则执行这个方法

废话少说, 上代码

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

/**
 * MessageHandleService
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Component
public class MqttMessageHandle implements MessageHandler {

    public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);

    // 包含 @MqttService注解 的类(Component)
    public static Map<String, Object> mqttServices;


    /**
     * 所有mqtt到达的消息都会在这里处理
     * 要注意这个方法是在线程池里面运行的
     * @param message message
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        getMqttTopicService(message);
    }

    public Map<String, Object> getMqttServices(){
        if(mqttServices==null){
            mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
        }
        return mqttServices;
    }

    public void getMqttTopicService(Message<?> message){
        // 在这里 我们根据不同的 主题 分发不同的消息
        String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);
        if(receivedTopic==null || "".equals(receivedTopic)){
            return;
        }
        for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){
        	// 把所有带有 @MqttService 的类遍历
            Class<?> clazz = entry.getValue().getClass();
            // 获取他所有方法
            Method[] methods = clazz.getDeclaredMethods();
            for ( Method method: methods ){
                if (method.isAnnotationPresent(MqttTopic.class)){
                	// 如果这个方法有 这个注解
                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
                    if(isMatch(receivedTopic,handleTopic.value())){
                    	// 并且 这个 topic 匹配成功
                        try {
                            method.invoke(SpringUtils.getBean(clazz),message);
                            return;
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                            log.error("代理炸了");
                        } catch (InvocationTargetException e) {
                            log.error("执行 {} 方法出现错误",handleTopic.value(),e);
                        }
                    }
                }
            }
        }
    }


    /**
     * mqtt 订阅的主题与我实际的主题是否匹配
     * @param topic 是实际的主题
     * @param pattern 是我订阅的主题 可以是通配符模式
     * @return 是否匹配
     */
    public static boolean isMatch(String topic, String pattern){

        if((topic==null) || (pattern==null) ){
            return false;
        }

        if(topic.equals(pattern)){
            // 完全相等是肯定匹配的
            return true;
        }

        if("#".equals(pattern)){
            // # 号代表所有主题  肯定匹配的
            return true;
        }
        String[] splitTopic = topic.split("/");
        String[] splitPattern = pattern.split("/");

        boolean match = true;

        // 如果包含 # 则只需要判断 # 前面的
        for (int i = 0; i < splitPattern.length; i++) {
            if(!"#".equals(splitPattern[i])){
                // 不是# 号 正常判断
                if(i>=splitTopic.length){
                    // 此时长度不相等 不匹配
                    match = false;
                    break;
                }
                if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){
                    // 不相等 且不等于 +
                    match = false;
                    break;
                }
            }
            else {
                // 是# 号  肯定匹配的
                break;
            }
        }

        return match;
    }

}
Copier après la connexion

工具类 SpringUtils

Partie 1 Une étape pour ajouter une dépendance maven🎜
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * spring工具类 方便在非spring管理环境中获取bean
 * 
 */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware 
{
    /** Spring应用上下文环境 */
    private static ConfigurableListableBeanFactory beanFactory;

    private static ApplicationContext applicationContext;


    public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{

        return beanFactory.getBeansWithAnnotation(clsName);
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException 
    {
        SpringUtils.beanFactory = beanFactory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
    {
        SpringUtils.applicationContext = applicationContext;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException
    {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static <T> T getBean(Class<T> clz) throws BeansException
    {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name)
    {
        return beanFactory.containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getAliases(name);
    }

    /**
     * 获取aop代理对象
     * 
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker)
    {
        return (T) AopContext.currentProxy();
    }

    /**
     * 获取当前的环境配置,无配置返回null
     *
     * @return 当前的环境配置
     */
    public static String[] getActiveProfiles()
    {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

}
Copier après la connexion
Copier après la connexion
🎜La deuxième étape consiste à ajouter la configuration🎜🎜1 Écrivez d'abord une configuration de base🎜
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;

import java.util.Arrays;

/**
 * MqttService
 *
 * @author hengzi
 * @date 2022/8/25
 */
@Service
public class MqttService {

    @Autowired
    private MqttPahoMessageDrivenChannelAdapter adapter;


    public void addTopic(String topic) {
        addTopic(topic, 1);
    }

    public void addTopic(String topic,int qos) {
        String[] topics = adapter.getTopic();
        if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,qos);
        }
    }

    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }

}
Copier après la connexion
Copier après la connexion
🎜2 Ensuite, écrivez une classe correspondante MqttProperties</. code>🎜rrreee🎜L'étape suivante consiste à configurer certains éléments désordonnés. Il y a beaucoup de choses conceptuelles ici telles que le canal <code>channel, l'adaptateur adaptateur, le Inbound entrant. , Outbound code> sortant, etc., cela semble être un très gros casse-tête🎜🎜D'accord, faisons-le un par un,🎜🎜Tout d'abord, la connexion à mqtt nécessite un client, puis nous le ferons ouvrir une usine client, qui peut en produire beaucoup, beaucoup. Le client 🎜rrreee🎜 crée ensuite deux tuyaux (channel), un sortant et un entrant🎜rrreee🎜Afin de faire circuler ces tuyaux, un adaptateur ( un adaptateur est nécessaire code>)🎜rrreee🎜Ensuite, définissez le producteur du message🎜rrreee🎜Alors où traitons-nous les messages que nous recevons La réponse est ici :🎜rrreee🎜À ce stade, nous pouvons réellement recevoir des messages ? depuis mqtt🎜🎜Ensuite, configurez la direction dans laquelle mqtt envoie les messages 🎜🎜 Configurez le processeur sortant 🎜rrreee🎜 À mon avis, ce processeur sortant est géré par d'autres (MqttPahoMessageHandler), donc je ne le traiterai pas , je me soucie juste de ce que je veux envoyer, quant à la façon d'envoyer, cela se fait par MqttPahoMessageHandler🎜🎜Ensuite, nous définissons une interface🎜rrreee🎜Nous pouvons appeler directement cette interface pour envoyer des données à mqtt 🎜🎜Jusqu'à présent, l'ensemble du fichier de configuration est long. Comme ceci : 🎜rrreee🎜MqttMessageHandle 🎜rrreee🎜Après une meilleure compréhension, j'ai trouvé des zones qui peuvent être optimisées. Par exemple, il existe de nombreux types de canaux. Le DirectChannel</code utilisé ici est >, est le canal de message par défaut de <code>Spring Integration. Il envoie le message à un abonné, puis bloque l'envoi jusqu'à ce que le message soit reçu. les méthodes sont toutes synchrones et exécutées par un thread. .🎜🎜Ici, nous pouvons changer le channel entrant en ExecutorChannel un channel qui peut utiliser plusieurs- threading🎜rrreee🎜Il peut en fait fonctionner ici 🎜🎜Mais cette configuration est en fait un peu trop lourde et un peu compliquée, alors j'ai cherché sur le site officiel et trouvé une méthode de configuration plus simple appelée Java DSL🎜🎜Nous reportez-vous au site officiel, modifiez-le légèrement et utilisez la configuration DSL :🎜rrreee🎜Cela a l'air vraiment beaucoup plus simple, et la tête n'est pas si grosse que j'aurais aimé le savoir plus tôt.🎜🎜D'accord, ce qui précède est lié à. configuration. À ce stade, springboot et mqtt sont intégrés.🎜🎜Mais en fait, j'ai toujours eu une idée, c'est-à-dire que les messages que nous recevons sont tous exécutés dans la méthode handleMessage,🎜rrreee🎜Donc je avoir une idée, est-ce possible ? Selon les sujets auxquels je m'abonne, il est exécuté selon différentes méthodes. Pour ce problème, vous pouvez en fait utiliser if ... else ... pour y parvenir, mais évidemment. , s'il y a de nombreux sujets auxquels je suis abonné, ce serait un casse-tête à écrire 🎜🎜Il y a deux idées pour ce problème. La première consiste à ajouter le routage routeur de Spring Integration</. code>, et l'itinéraire vers différents <code>selon différents sujets. code>canal, je sais aussi si cela peut être réalisé, donc je n'en discuterai pas ici 🎜🎜La seconde est que je ne le fais pas. Je ne sais pas comment changer le nom, je me réfère à spring</code >La conception de <code>@Controller est appelée mode annotation pour le moment Comme nous le savons tous, nos interfaces ajoutent l'annotation. @Controller à la classe, ce qui signifie que la classe est une interface http, puis ajouter @RequestMapping à la méthode peut réaliser différentes URL appelant différentes méthodes 🎜🎜Pour la conception des paramètres. , nous ajoutons @MqttService à la classe pour représenter cela. La classe est une classe de service spécialisée dans le traitement des messages mqtt
En même temps, en ajoutant @MqttTopic à la méthode de cette classe signifie que le sujet est traité par cette méthode. 🎜🎜OK, la théorie est là, ensuite juste de la pratique.🎜🎜Définissez d'abord deux annotations🎜rrreee🎜Ajoutez l'annotation @Component et spring le scannera et l'enregistrera dans le conteneur IOC🎜rrreee🎜Référez-vous à @RequestMapping nous. Il doit être utilisé comme ceci :🎜rrreee🎜OK La prochaine étape consiste à mettre en œuvre une telle utilisation🎜🎜Analyse :🎜 🎜Lorsque nous recevons le message, nous trouvons toutes les annotations avec @MqttService de la classe du conteneur IOC🎜🎜Puis parcourons ces classes et trouvons la méthode avec @MqttTopic🎜🎜Ensuite comparez la valeur de l'annotation @MqttTopic avec le sujet reçu🎜🎜S'ils sont cohérents Ensuite, exécutez cette méthode🎜🎜Arrêtez de dire des bêtises, passons au code🎜rrreee🎜Classe d'outils SpringUtils < /code>🎜<div class="code" style="position:relative; padding:0px; margin:0px;"><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">import org.springframework.aop.framework.AopContext; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.util.Map; /** * spring工具类 方便在非spring管理环境中获取bean * */ @Component public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware { /** Spring应用上下文环境 */ private static ConfigurableListableBeanFactory beanFactory; private static ApplicationContext applicationContext; public static Map&lt;String, Object&gt; getBeansByAnnotation(Class clsName) throws BeansException{ return beanFactory.getBeansWithAnnotation(clsName); } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringUtils.applicationContext = applicationContext; } /** * 获取对象 * * @param name * @return Object 一个以所给名字注册的bean的实例 * @throws org.springframework.beans.BeansException * */ @SuppressWarnings(&quot;unchecked&quot;) public static &lt;T&gt; T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 获取类型为requiredType的对象 * * @param clz * @return * @throws org.springframework.beans.BeansException * */ public static &lt;T&gt; T getBean(Class&lt;T&gt; clz) throws BeansException { T result = (T) beanFactory.getBean(clz); return result; } /** * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 注册对象的类型 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static Class&lt;?&gt; getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 如果给定的bean名字在bean定义中有别名,则返回这些别名 * * @param name * @return * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } /** * 获取aop代理对象 * * @param invoker * @return */ @SuppressWarnings(&quot;unchecked&quot;) public static &lt;T&gt; T getAopProxy(T invoker) { return (T) AopContext.currentProxy(); } /** * 获取当前的环境配置,无配置返回null * * @return 当前的环境配置 */ public static String[] getActiveProfiles() { return applicationContext.getEnvironment().getActiveProfiles(); } }</pre><div class="contentsignin">Copier après la connexion</div></div><div class="contentsignin">Copier après la connexion</div></div><p>OK, 大功告成. 终于舒服了, 终于不用写<code>if...else...了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:

  • 使用 Spring integration 在Springboot中集成Mqtt

  • Spring Integration(一)概述

附:

动态添加主题方式:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;

import java.util.Arrays;

/**
 * MqttService
 *
 * @author hengzi
 * @date 2022/8/25
 */
@Service
public class MqttService {

    @Autowired
    private MqttPahoMessageDrivenChannelAdapter adapter;


    public void addTopic(String topic) {
        addTopic(topic, 1);
    }

    public void addTopic(String topic,int qos) {
        String[] topics = adapter.getTopic();
        if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,qos);
        }
    }

    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }

}
Copier après la connexion
Copier après la connexion

直接调用就行

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal