You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

332 lines
12 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

在物联网IoT和分布式系统中MQTTMessage Queuing Telemetry Transport是一种轻量级、基于发布/订阅模式的消息传输协议特别适合带宽有限、网络不稳定的场景。Spring Boot作为主流的Java开发框架通过集成MQTT客户端库如Eclipse Paho可以快速实现MQTT通信功能。
### 一、MQTT核心概念
在详解Spring Boot集成MQTT前需先了解几个核心概念
- **Broker**MQTT服务器如Eclipse Mosquitto、EMQX负责接收客户端发送的消息并转发给订阅者。
- **客户端Client**分为发布者Publisher和订阅者Subscriber同一客户端可同时扮演两种角色。
- **主题Topic**:消息的分类标识(如`device/temp`),支持层级结构和通配符(`+`匹配单级、`#`匹配多级)。
- **QoSQuality of Service**消息传输质量等级分3级
- QoS 0最多一次消息可能丢失不确认
- QoS 1至少一次确保消息到达可能重复
- QoS 2刚好一次确保消息唯一到达最可靠但开销大
- **遗嘱消息Last Will and Testament**客户端异常断开时Broker自动向指定主题发送的消息。
### 二、Spring Boot集成MQTT的核心依赖
Spring Boot本身不直接提供MQTT Starter通常通过集成**Eclipse Paho MQTT客户端**实现。需在`pom.xml`中添加依赖:
```xml
<!-- MQTT客户端核心依赖 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- 可选Spring Integration MQTT更符合Spring编程模型 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.15</version>
</dependency>
```
### 三、基础实现基于Paho客户端的MQTT通信
#### 1. 配置MQTT连接参数
在`application.yml`中配置Broker地址、客户端ID等信息
```yaml
mqtt:
broker-url: tcp://localhost:1883 # MQTT Broker地址TCP协议
client-id: springboot-mqtt-client # 客户端唯一ID避免重复可加随机数
username: admin # 可选Broker认证用户名
password: 123456 # 可选Broker认证密码
keep-alive: 60 # 心跳间隔(秒)
default-topic: device/data # 默认主题
qos: 1 # 默认QoS等级
```
#### 2. 配置MQTT客户端工厂
通过`@Configuration`创建MQTT客户端工厂和连接选项
```java
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.keep-alive}")
private int keepAlive;
// 配置连接选项
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl}); // 支持多个Broker地址
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(keepAlive);
options.setCleanSession(false); // 不清除会话(重连后保留订阅关系)
// 配置遗嘱消息(可选)
options.setWill("device/offline", "客户端断开连接".getBytes(), 1, false);
return options;
}
// 创建MQTT客户端实例异步客户端推荐使用
@Bean
public MqttAsyncClient mqttAsyncClient() throws Exception {
// MemoryPersistence消息临时存储在内存可选FilePersistence持久化到文件
MqttAsyncClient client = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
// 连接Broker
client.connect(mqttConnectOptions()).waitForCompletion();
return client;
}
}
```
#### 3. 实现消息发送Publisher
创建消息发送服务,封装发送逻辑:
```java
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class MqttPublisher {
@Autowired
private MqttAsyncClient mqttClient;
@Value("${mqtt.default-topic}")
private String defaultTopic;
@Value("${mqtt.qos}")
private int defaultQos;
/**
* 发送消息到默认主题
*/
public void send(String payload) throws Exception {
send(defaultTopic, payload, defaultQos, false);
}
/**
* 发送消息到指定主题
* @param topic 主题
* @param payload 消息内容
* @param qos QoS等级
* @param retained 是否保留消息Broker存储最后一条保留消息新订阅者会立即收到
*/
public void send(String topic, String payload, int qos, boolean retained) throws Exception {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
message.setRetained(retained);
// 异步发送waitForCompletion()等待发送完成
mqttClient.publish(topic, message).waitForCompletion();
}
}
```
#### 4. 实现消息接收Subscriber
通过`MqttCallback`接口监听消息和连接状态:
```java
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class MqttSubscriber {
@Autowired
private MqttAsyncClient mqttClient;
@Value("${mqtt.default-topic}")
private String defaultTopic;
@Value("${mqtt.qos}")
private int defaultQos;
// 初始化时订阅主题
@PostConstruct
public void subscribe() throws Exception {
// 订阅默认主题指定QoS和消息监听器
mqttClient.subscribe(defaultTopic, defaultQos, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
System.out.println("收到消息:主题=" + topic + ",内容=" + payload);
// 处理消息逻辑(如存入数据库、触发业务操作等)
}
}).waitForCompletion();
}
}
```
### 四、高级用法基于Spring Integration MQTT
Spring Integration提供了更符合Spring风格的MQTT集成方式通过消息通道MessageChannel和注解简化开发。
#### 1. 配置Integration MQTT
```java
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttIntegrationConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
// 客户端工厂
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
// 接收消息的通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
// 入站适配器(接收消息)
@Bean
public MessageProducer inbound() {
// 订阅主题device/data客户端ID加后缀避免与发送端冲突
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound", mqttClientFactory(), "device/data");
adapter.setConverter(new DefaultPahoMessageConverter()); // 消息转换器
adapter.setQos(1); // QoS等级
adapter.setOutputChannel(mqttInputChannel()); // 绑定到接收通道
return adapter;
}
// 处理接收的消息(使用@ServiceActivator
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
String payload = message.getPayload().toString();
System.out.println("Integration接收消息主题=" + topic + ",内容=" + payload);
};
}
// 发送消息的通道
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
// 出站适配器(发送消息)
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler outbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientId + "-outbound", mqttClientFactory());
handler.setAsync(true); // 异步发送
handler.setDefaultTopic("device/control"); // 默认发送主题
handler.setDefaultQos(1); // 默认QoS
return handler;
}
}
```
#### 2. 使用Integration发送消息
通过`MessageChannel`发送消息:
```java
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class IntegrationMqttPublisher {
// 注入发送通道
@Resource(name = "mqttOutputChannel")
private MessageChannel mqttOutputChannel;
public void send(String payload) {
// 发送到默认主题
mqttOutputChannel.send(MessageBuilder.withPayload(payload).build());
}
public void sendToTopic(String topic, String payload) {
// 发送到指定主题通过header指定
mqttOutputChannel.send(MessageBuilder.withPayload(payload)
.setHeader("mqtt_topic", topic)
.build());
}
}
```
### 五、注意事项
1. **客户端ID唯一性**同一Broker下客户端ID不可重复否则会导致连接被强制断开可通过`clientId + 随机数`避免)。
2. **QoS选择**根据业务可靠性要求选择QoS物联网场景常用QoS 1平衡可靠性和性能
3. **断线重连**Paho客户端默认支持重连可通过`MqttConnectOptions.setAutomaticReconnect(true)`增强重连逻辑。
4. **消息持久化**:若需避免消息丢失,可使用`FilePersistence`替代`MemoryPersistence`,并设置`cleanSession=false`。
5. **主题设计**:合理规划主题层级(如`device/{设备ID}/temp`),便于管理和订阅。
通过以上方式Spring Boot可快速集成MQTT实现消息的发布与订阅适用于物联网设备通信、分布式系统通知等场景。根据业务复杂度可选择基础Paho客户端或Spring Integration简化开发。