|
|
在物联网(IoT)和分布式系统中,MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布/订阅模式的消息传输协议,特别适合带宽有限、网络不稳定的场景。Spring Boot作为主流的Java开发框架,通过集成MQTT客户端库(如Eclipse Paho)可以快速实现MQTT通信功能。
|
|
|
|
|
|
|
|
|
### 一、MQTT核心概念
|
|
|
在详解Spring Boot集成MQTT前,需先了解几个核心概念:
|
|
|
- **Broker**:MQTT服务器(如Eclipse Mosquitto、EMQX),负责接收客户端发送的消息并转发给订阅者。
|
|
|
- **客户端(Client)**:分为发布者(Publisher)和订阅者(Subscriber),同一客户端可同时扮演两种角色。
|
|
|
- **主题(Topic)**:消息的分类标识(如`device/temp`),支持层级结构和通配符(`+`匹配单级、`#`匹配多级)。
|
|
|
- **QoS(Quality of Service)**:消息传输质量等级,分3级:
|
|
|
- QoS 0:最多一次(消息可能丢失,不确认)。
|
|
|
- QoS 1:至少一次(确保消息到达,可能重复)。
|
|
|
- QoS 2:刚好一次(确保消息唯一到达,最可靠但开销大)。
|
|
|
- **遗嘱消息(Last Will and Testament)**:客户端异常断开时,Broker自动向指定主题发送的消息。
|
|
|
|
|
|
|
|
|
### 二、Spring Boot集成MQTT的核心依赖
|
|
|
Spring Boot本身不直接提供MQTT Starter,通常通过集成**Eclipse Paho MQTT客户端**实现。需在`pom.xml`中添加依赖:
|
|
|
|
|
|
```xml
|
|
|
<!-- MQTT客户端核心依赖 -->
|
|
|
<dependency>
|
|
|
<groupId>org.eclipse.paho</groupId>
|
|
|
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
|
|
<version>1.2.5</version>
|
|
|
</dependency>
|
|
|
|
|
|
<!-- 可选:Spring Integration MQTT(更符合Spring编程模型) -->
|
|
|
<dependency>
|
|
|
<groupId>org.springframework.integration</groupId>
|
|
|
<artifactId>spring-integration-mqtt</artifactId>
|
|
|
<version>5.5.15</version>
|
|
|
</dependency>
|
|
|
```
|
|
|
|
|
|
|
|
|
### 三、基础实现:基于Paho客户端的MQTT通信
|
|
|
#### 1. 配置MQTT连接参数
|
|
|
在`application.yml`中配置Broker地址、客户端ID等信息:
|
|
|
|
|
|
```yaml
|
|
|
mqtt:
|
|
|
broker-url: tcp://localhost:1883 # MQTT Broker地址(TCP协议)
|
|
|
client-id: springboot-mqtt-client # 客户端唯一ID(避免重复,可加随机数)
|
|
|
username: admin # 可选:Broker认证用户名
|
|
|
password: 123456 # 可选:Broker认证密码
|
|
|
keep-alive: 60 # 心跳间隔(秒)
|
|
|
default-topic: device/data # 默认主题
|
|
|
qos: 1 # 默认QoS等级
|
|
|
```
|
|
|
|
|
|
|
|
|
#### 2. 配置MQTT客户端工厂
|
|
|
通过`@Configuration`创建MQTT客户端工厂和连接选项:
|
|
|
|
|
|
```java
|
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
|
|
@Configuration
|
|
|
public class MqttConfig {
|
|
|
|
|
|
@Value("${mqtt.broker-url}")
|
|
|
private String brokerUrl;
|
|
|
|
|
|
@Value("${mqtt.client-id}")
|
|
|
private String clientId;
|
|
|
|
|
|
@Value("${mqtt.username}")
|
|
|
private String username;
|
|
|
|
|
|
@Value("${mqtt.password}")
|
|
|
private String password;
|
|
|
|
|
|
@Value("${mqtt.keep-alive}")
|
|
|
private int keepAlive;
|
|
|
|
|
|
// 配置连接选项
|
|
|
@Bean
|
|
|
public MqttConnectOptions mqttConnectOptions() {
|
|
|
MqttConnectOptions options = new MqttConnectOptions();
|
|
|
options.setServerURIs(new String[]{brokerUrl}); // 支持多个Broker地址
|
|
|
options.setUserName(username);
|
|
|
options.setPassword(password.toCharArray());
|
|
|
options.setKeepAliveInterval(keepAlive);
|
|
|
options.setCleanSession(false); // 不清除会话(重连后保留订阅关系)
|
|
|
|
|
|
// 配置遗嘱消息(可选)
|
|
|
options.setWill("device/offline", "客户端断开连接".getBytes(), 1, false);
|
|
|
return options;
|
|
|
}
|
|
|
|
|
|
// 创建MQTT客户端实例(异步客户端,推荐使用)
|
|
|
@Bean
|
|
|
public MqttAsyncClient mqttAsyncClient() throws Exception {
|
|
|
// MemoryPersistence:消息临时存储在内存(可选:FilePersistence持久化到文件)
|
|
|
MqttAsyncClient client = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
|
|
|
// 连接Broker
|
|
|
client.connect(mqttConnectOptions()).waitForCompletion();
|
|
|
return client;
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
|
|
|
#### 3. 实现消息发送(Publisher)
|
|
|
创建消息发送服务,封装发送逻辑:
|
|
|
|
|
|
```java
|
|
|
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
@Service
|
|
|
public class MqttPublisher {
|
|
|
|
|
|
@Autowired
|
|
|
private MqttAsyncClient mqttClient;
|
|
|
|
|
|
@Value("${mqtt.default-topic}")
|
|
|
private String defaultTopic;
|
|
|
|
|
|
@Value("${mqtt.qos}")
|
|
|
private int defaultQos;
|
|
|
|
|
|
/**
|
|
|
* 发送消息到默认主题
|
|
|
*/
|
|
|
public void send(String payload) throws Exception {
|
|
|
send(defaultTopic, payload, defaultQos, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 发送消息到指定主题
|
|
|
* @param topic 主题
|
|
|
* @param payload 消息内容
|
|
|
* @param qos QoS等级
|
|
|
* @param retained 是否保留消息(Broker存储最后一条保留消息,新订阅者会立即收到)
|
|
|
*/
|
|
|
public void send(String topic, String payload, int qos, boolean retained) throws Exception {
|
|
|
MqttMessage message = new MqttMessage(payload.getBytes());
|
|
|
message.setQos(qos);
|
|
|
message.setRetained(retained);
|
|
|
// 异步发送,waitForCompletion()等待发送完成
|
|
|
mqttClient.publish(topic, message).waitForCompletion();
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
|
|
|
#### 4. 实现消息接收(Subscriber)
|
|
|
通过`MqttCallback`接口监听消息和连接状态:
|
|
|
|
|
|
```java
|
|
|
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
|
|
@Service
|
|
|
public class MqttSubscriber {
|
|
|
|
|
|
@Autowired
|
|
|
private MqttAsyncClient mqttClient;
|
|
|
|
|
|
@Value("${mqtt.default-topic}")
|
|
|
private String defaultTopic;
|
|
|
|
|
|
@Value("${mqtt.qos}")
|
|
|
private int defaultQos;
|
|
|
|
|
|
// 初始化时订阅主题
|
|
|
@PostConstruct
|
|
|
public void subscribe() throws Exception {
|
|
|
// 订阅默认主题,指定QoS和消息监听器
|
|
|
mqttClient.subscribe(defaultTopic, defaultQos, new IMqttMessageListener() {
|
|
|
@Override
|
|
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
String payload = new String(message.getPayload());
|
|
|
System.out.println("收到消息:主题=" + topic + ",内容=" + payload);
|
|
|
// 处理消息逻辑(如存入数据库、触发业务操作等)
|
|
|
}
|
|
|
}).waitForCompletion();
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
|
|
|
### 四、高级用法:基于Spring Integration MQTT
|
|
|
Spring Integration提供了更符合Spring风格的MQTT集成方式,通过消息通道(MessageChannel)和注解简化开发。
|
|
|
|
|
|
#### 1. 配置Integration MQTT
|
|
|
```java
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
import org.springframework.integration.channel.DirectChannel;
|
|
|
import org.springframework.integration.core.MessageProducer;
|
|
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
|
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
|
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
|
import org.springframework.messaging.MessageChannel;
|
|
|
import org.springframework.messaging.MessageHandler;
|
|
|
|
|
|
@Configuration
|
|
|
public class MqttIntegrationConfig {
|
|
|
|
|
|
@Value("${mqtt.broker-url}")
|
|
|
private String brokerUrl;
|
|
|
|
|
|
@Value("${mqtt.client-id}")
|
|
|
private String clientId;
|
|
|
|
|
|
@Value("${mqtt.username}")
|
|
|
private String username;
|
|
|
|
|
|
@Value("${mqtt.password}")
|
|
|
private String password;
|
|
|
|
|
|
// 客户端工厂
|
|
|
@Bean
|
|
|
public MqttPahoClientFactory mqttClientFactory() {
|
|
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
|
|
MqttConnectOptions options = new MqttConnectOptions();
|
|
|
options.setServerURIs(new String[]{brokerUrl});
|
|
|
options.setUserName(username);
|
|
|
options.setPassword(password.toCharArray());
|
|
|
factory.setConnectionOptions(options);
|
|
|
return factory;
|
|
|
}
|
|
|
|
|
|
// 接收消息的通道
|
|
|
@Bean
|
|
|
public MessageChannel mqttInputChannel() {
|
|
|
return new DirectChannel();
|
|
|
}
|
|
|
|
|
|
// 入站适配器(接收消息)
|
|
|
@Bean
|
|
|
public MessageProducer inbound() {
|
|
|
// 订阅主题:device/data,客户端ID加后缀避免与发送端冲突
|
|
|
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
|
|
|
clientId + "-inbound", mqttClientFactory(), "device/data");
|
|
|
adapter.setConverter(new DefaultPahoMessageConverter()); // 消息转换器
|
|
|
adapter.setQos(1); // QoS等级
|
|
|
adapter.setOutputChannel(mqttInputChannel()); // 绑定到接收通道
|
|
|
return adapter;
|
|
|
}
|
|
|
|
|
|
// 处理接收的消息(使用@ServiceActivator)
|
|
|
@Bean
|
|
|
@ServiceActivator(inputChannel = "mqttInputChannel")
|
|
|
public MessageHandler handler() {
|
|
|
return message -> {
|
|
|
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
|
|
|
String payload = message.getPayload().toString();
|
|
|
System.out.println("Integration接收消息:主题=" + topic + ",内容=" + payload);
|
|
|
};
|
|
|
}
|
|
|
|
|
|
// 发送消息的通道
|
|
|
@Bean
|
|
|
public MessageChannel mqttOutputChannel() {
|
|
|
return new DirectChannel();
|
|
|
}
|
|
|
|
|
|
// 出站适配器(发送消息)
|
|
|
@Bean
|
|
|
@ServiceActivator(inputChannel = "mqttOutputChannel")
|
|
|
public MessageHandler outbound() {
|
|
|
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
|
|
|
clientId + "-outbound", mqttClientFactory());
|
|
|
handler.setAsync(true); // 异步发送
|
|
|
handler.setDefaultTopic("device/control"); // 默认发送主题
|
|
|
handler.setDefaultQos(1); // 默认QoS
|
|
|
return handler;
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
|
|
|
#### 2. 使用Integration发送消息
|
|
|
通过`MessageChannel`发送消息:
|
|
|
|
|
|
```java
|
|
|
import org.springframework.integration.support.MessageBuilder;
|
|
|
import org.springframework.messaging.MessageChannel;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
|
|
|
@Service
|
|
|
public class IntegrationMqttPublisher {
|
|
|
|
|
|
// 注入发送通道
|
|
|
@Resource(name = "mqttOutputChannel")
|
|
|
private MessageChannel mqttOutputChannel;
|
|
|
|
|
|
public void send(String payload) {
|
|
|
// 发送到默认主题
|
|
|
mqttOutputChannel.send(MessageBuilder.withPayload(payload).build());
|
|
|
}
|
|
|
|
|
|
public void sendToTopic(String topic, String payload) {
|
|
|
// 发送到指定主题(通过header指定)
|
|
|
mqttOutputChannel.send(MessageBuilder.withPayload(payload)
|
|
|
.setHeader("mqtt_topic", topic)
|
|
|
.build());
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
|
|
|
### 五、注意事项
|
|
|
1. **客户端ID唯一性**:同一Broker下客户端ID不可重复,否则会导致连接被强制断开(可通过`clientId + 随机数`避免)。
|
|
|
2. **QoS选择**:根据业务可靠性要求选择QoS,物联网场景常用QoS 1(平衡可靠性和性能)。
|
|
|
3. **断线重连**:Paho客户端默认支持重连,可通过`MqttConnectOptions.setAutomaticReconnect(true)`增强重连逻辑。
|
|
|
4. **消息持久化**:若需避免消息丢失,可使用`FilePersistence`替代`MemoryPersistence`,并设置`cleanSession=false`。
|
|
|
5. **主题设计**:合理规划主题层级(如`device/{设备ID}/temp`),便于管理和订阅。
|
|
|
|
|
|
|
|
|
通过以上方式,Spring Boot可快速集成MQTT实现消息的发布与订阅,适用于物联网设备通信、分布式系统通知等场景。根据业务复杂度,可选择基础Paho客户端或Spring Integration简化开发。 |