You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
管理员 94f7d67452 . 3 months ago
..
docker update 重写ruoyi-mqtt模块 4 months ago
src/main . 3 months ago
README.md update 重写ruoyi-mqtt模块 4 months ago
pom.xml . 3 months ago

README.md

在物联网IoT和分布式系统中MQTTMessage Queuing Telemetry Transport是一种轻量级、基于发布/订阅模式的消息传输协议特别适合带宽有限、网络不稳定的场景。Spring Boot作为主流的Java开发框架通过集成MQTT客户端库如Eclipse Paho可以快速实现MQTT通信功能。

一、MQTT核心概念

在详解Spring Boot集成MQTT前需先了解几个核心概念

  • BrokerMQTT服务器如Eclipse Mosquitto、EMQX负责接收客户端发送的消息并转发给订阅者。
  • 客户端Client分为发布者Publisher和订阅者Subscriber同一客户端可同时扮演两种角色。
  • 主题Topic:消息的分类标识(如device/temp),支持层级结构和通配符(+匹配单级、#匹配多级)。
  • QoSQuality of Service消息传输质量等级分3级
    • QoS 0最多一次消息可能丢失不确认
    • QoS 1至少一次确保消息到达可能重复
    • QoS 2刚好一次确保消息唯一到达最可靠但开销大
  • 遗嘱消息Last Will and Testament客户端异常断开时Broker自动向指定主题发送的消息。

二、Spring Boot集成MQTT的核心依赖

Spring Boot本身不直接提供MQTT Starter通常通过集成Eclipse Paho MQTT客户端实现。需在pom.xml中添加依赖:

<!-- MQTT客户端核心依赖 -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

<!-- 可选Spring Integration MQTT更符合Spring编程模型 -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.15</version>
</dependency>

三、基础实现基于Paho客户端的MQTT通信

1. 配置MQTT连接参数

application.yml中配置Broker地址、客户端ID等信息

mqtt:
  broker-url: tcp://localhost:1883  # MQTT Broker地址TCP协议
  client-id: springboot-mqtt-client  # 客户端唯一ID避免重复可加随机数
  username: admin  # 可选Broker认证用户名
  password: 123456  # 可选Broker认证密码
  keep-alive: 60  # 心跳间隔(秒)
  default-topic: device/data  # 默认主题
  qos: 1  # 默认QoS等级

2. 配置MQTT客户端工厂

通过@Configuration创建MQTT客户端工厂和连接选项

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.keep-alive}")
    private int keepAlive;

    // 配置连接选项
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl}); // 支持多个Broker地址
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setKeepAliveInterval(keepAlive);
        options.setCleanSession(false); // 不清除会话(重连后保留订阅关系)

        // 配置遗嘱消息(可选)
        options.setWill("device/offline", "客户端断开连接".getBytes(), 1, false);
        return options;
    }

    // 创建MQTT客户端实例异步客户端推荐使用
    @Bean
    public MqttAsyncClient mqttAsyncClient() throws Exception {
        // MemoryPersistence消息临时存储在内存可选FilePersistence持久化到文件
        MqttAsyncClient client = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
        // 连接Broker
        client.connect(mqttConnectOptions()).waitForCompletion();
        return client;
    }
}

3. 实现消息发送Publisher

创建消息发送服务,封装发送逻辑:

import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MqttPublisher {

    @Autowired
    private MqttAsyncClient mqttClient;

    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    @Value("${mqtt.qos}")
    private int defaultQos;

    /**
     * 发送消息到默认主题
     */
    public void send(String payload) throws Exception {
        send(defaultTopic, payload, defaultQos, false);
    }

    /**
     * 发送消息到指定主题
     * @param topic 主题
     * @param payload 消息内容
     * @param qos QoS等级
     * @param retained 是否保留消息Broker存储最后一条保留消息新订阅者会立即收到
     */
    public void send(String topic, String payload, int qos, boolean retained) throws Exception {
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        // 异步发送waitForCompletion()等待发送完成
        mqttClient.publish(topic, message).waitForCompletion();
    }
}

4. 实现消息接收Subscriber

通过MqttCallback接口监听消息和连接状态:

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Service
public class MqttSubscriber {

    @Autowired
    private MqttAsyncClient mqttClient;

    @Value("${mqtt.default-topic}")
    private String defaultTopic;

    @Value("${mqtt.qos}")
    private int defaultQos;

    // 初始化时订阅主题
    @PostConstruct
    public void subscribe() throws Exception {
        // 订阅默认主题指定QoS和消息监听器
        mqttClient.subscribe(defaultTopic, defaultQos, new IMqttMessageListener() {
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String payload = new String(message.getPayload());
                System.out.println("收到消息:主题=" + topic + ",内容=" + payload);
                // 处理消息逻辑(如存入数据库、触发业务操作等)
            }
        }).waitForCompletion();
    }
}

四、高级用法基于Spring Integration MQTT

Spring Integration提供了更符合Spring风格的MQTT集成方式通过消息通道MessageChannel和注解简化开发。

1. 配置Integration MQTT

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttIntegrationConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    // 客户端工厂
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    // 接收消息的通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    // 入站适配器(接收消息)
    @Bean
    public MessageProducer inbound() {
        // 订阅主题device/data客户端ID加后缀避免与发送端冲突
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                clientId + "-inbound", mqttClientFactory(), "device/data");
        adapter.setConverter(new DefaultPahoMessageConverter()); // 消息转换器
        adapter.setQos(1); // QoS等级
        adapter.setOutputChannel(mqttInputChannel()); // 绑定到接收通道
        return adapter;
    }

    // 处理接收的消息(使用@ServiceActivator
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
            String payload = message.getPayload().toString();
            System.out.println("Integration接收消息主题=" + topic + ",内容=" + payload);
        };
    }

    // 发送消息的通道
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }

    // 出站适配器(发送消息)
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler outbound() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
                clientId + "-outbound", mqttClientFactory());
        handler.setAsync(true); // 异步发送
        handler.setDefaultTopic("device/control"); // 默认发送主题
        handler.setDefaultQos(1); // 默认QoS
        return handler;
    }
}

2. 使用Integration发送消息

通过MessageChannel发送消息:

import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class IntegrationMqttPublisher {

    // 注入发送通道
    @Resource(name = "mqttOutputChannel")
    private MessageChannel mqttOutputChannel;

    public void send(String payload) {
        // 发送到默认主题
        mqttOutputChannel.send(MessageBuilder.withPayload(payload).build());
    }

    public void sendToTopic(String topic, String payload) {
        // 发送到指定主题通过header指定
        mqttOutputChannel.send(MessageBuilder.withPayload(payload)
                .setHeader("mqtt_topic", topic)
                .build());
    }
}

五、注意事项

  1. 客户端ID唯一性同一Broker下客户端ID不可重复否则会导致连接被强制断开可通过clientId + 随机数避免)。
  2. QoS选择根据业务可靠性要求选择QoS物联网场景常用QoS 1平衡可靠性和性能
  3. 断线重连Paho客户端默认支持重连可通过MqttConnectOptions.setAutomaticReconnect(true)增强重连逻辑。
  4. 消息持久化:若需避免消息丢失,可使用FilePersistence替代MemoryPersistence,并设置cleanSession=false
  5. 主题设计:合理规划主题层级(如device/{设备ID}/temp),便于管理和订阅。

通过以上方式Spring Boot可快速集成MQTT实现消息的发布与订阅适用于物联网设备通信、分布式系统通知等场景。根据业务复杂度可选择基础Paho客户端或Spring Integration简化开发。