在物联网(IoT)和分布式系统中,MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布/订阅模式的消息传输协议,特别适合带宽有限、网络不稳定的场景。Spring Boot作为主流的Java开发框架,通过集成MQTT客户端库(如Eclipse Paho)可以快速实现MQTT通信功能。
### 一、MQTT核心概念
在详解Spring Boot集成MQTT前,需先了解几个核心概念:
- **Broker**:MQTT服务器(如Eclipse Mosquitto、EMQX),负责接收客户端发送的消息并转发给订阅者。
- **客户端(Client)**:分为发布者(Publisher)和订阅者(Subscriber),同一客户端可同时扮演两种角色。
- **主题(Topic)**:消息的分类标识(如`device/temp`),支持层级结构和通配符(`+`匹配单级、`#`匹配多级)。
- **QoS(Quality of Service)**:消息传输质量等级,分3级:
- QoS 0:最多一次(消息可能丢失,不确认)。
- QoS 1:至少一次(确保消息到达,可能重复)。
- QoS 2:刚好一次(确保消息唯一到达,最可靠但开销大)。
- **遗嘱消息(Last Will and Testament)**:客户端异常断开时,Broker自动向指定主题发送的消息。
### 二、Spring Boot集成MQTT的核心依赖
Spring Boot本身不直接提供MQTT Starter,通常通过集成**Eclipse Paho MQTT客户端**实现。需在`pom.xml`中添加依赖:
```xml
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.5
org.springframework.integration
spring-integration-mqtt
5.5.15
```
### 三、基础实现:基于Paho客户端的MQTT通信
#### 1. 配置MQTT连接参数
在`application.yml`中配置Broker地址、客户端ID等信息:
```yaml
mqtt:
broker-url: tcp://localhost:1883 # MQTT Broker地址(TCP协议)
client-id: springboot-mqtt-client # 客户端唯一ID(避免重复,可加随机数)
username: admin # 可选:Broker认证用户名
password: 123456 # 可选:Broker认证密码
keep-alive: 60 # 心跳间隔(秒)
default-topic: device/data # 默认主题
qos: 1 # 默认QoS等级
```
#### 2. 配置MQTT客户端工厂
通过`@Configuration`创建MQTT客户端工厂和连接选项:
```java
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.keep-alive}")
private int keepAlive;
// 配置连接选项
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl}); // 支持多个Broker地址
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(keepAlive);
options.setCleanSession(false); // 不清除会话(重连后保留订阅关系)
// 配置遗嘱消息(可选)
options.setWill("device/offline", "客户端断开连接".getBytes(), 1, false);
return options;
}
// 创建MQTT客户端实例(异步客户端,推荐使用)
@Bean
public MqttAsyncClient mqttAsyncClient() throws Exception {
// MemoryPersistence:消息临时存储在内存(可选:FilePersistence持久化到文件)
MqttAsyncClient client = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
// 连接Broker
client.connect(mqttConnectOptions()).waitForCompletion();
return client;
}
}
```
#### 3. 实现消息发送(Publisher)
创建消息发送服务,封装发送逻辑:
```java
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class MqttPublisher {
@Autowired
private MqttAsyncClient mqttClient;
@Value("${mqtt.default-topic}")
private String defaultTopic;
@Value("${mqtt.qos}")
private int defaultQos;
/**
* 发送消息到默认主题
*/
public void send(String payload) throws Exception {
send(defaultTopic, payload, defaultQos, false);
}
/**
* 发送消息到指定主题
* @param topic 主题
* @param payload 消息内容
* @param qos QoS等级
* @param retained 是否保留消息(Broker存储最后一条保留消息,新订阅者会立即收到)
*/
public void send(String topic, String payload, int qos, boolean retained) throws Exception {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
message.setRetained(retained);
// 异步发送,waitForCompletion()等待发送完成
mqttClient.publish(topic, message).waitForCompletion();
}
}
```
#### 4. 实现消息接收(Subscriber)
通过`MqttCallback`接口监听消息和连接状态:
```java
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class MqttSubscriber {
@Autowired
private MqttAsyncClient mqttClient;
@Value("${mqtt.default-topic}")
private String defaultTopic;
@Value("${mqtt.qos}")
private int defaultQos;
// 初始化时订阅主题
@PostConstruct
public void subscribe() throws Exception {
// 订阅默认主题,指定QoS和消息监听器
mqttClient.subscribe(defaultTopic, defaultQos, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
System.out.println("收到消息:主题=" + topic + ",内容=" + payload);
// 处理消息逻辑(如存入数据库、触发业务操作等)
}
}).waitForCompletion();
}
}
```
### 四、高级用法:基于Spring Integration MQTT
Spring Integration提供了更符合Spring风格的MQTT集成方式,通过消息通道(MessageChannel)和注解简化开发。
#### 1. 配置Integration MQTT
```java
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttIntegrationConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
// 客户端工厂
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
// 接收消息的通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
// 入站适配器(接收消息)
@Bean
public MessageProducer inbound() {
// 订阅主题:device/data,客户端ID加后缀避免与发送端冲突
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound", mqttClientFactory(), "device/data");
adapter.setConverter(new DefaultPahoMessageConverter()); // 消息转换器
adapter.setQos(1); // QoS等级
adapter.setOutputChannel(mqttInputChannel()); // 绑定到接收通道
return adapter;
}
// 处理接收的消息(使用@ServiceActivator)
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
String payload = message.getPayload().toString();
System.out.println("Integration接收消息:主题=" + topic + ",内容=" + payload);
};
}
// 发送消息的通道
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
// 出站适配器(发送消息)
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler outbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientId + "-outbound", mqttClientFactory());
handler.setAsync(true); // 异步发送
handler.setDefaultTopic("device/control"); // 默认发送主题
handler.setDefaultQos(1); // 默认QoS
return handler;
}
}
```
#### 2. 使用Integration发送消息
通过`MessageChannel`发送消息:
```java
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class IntegrationMqttPublisher {
// 注入发送通道
@Resource(name = "mqttOutputChannel")
private MessageChannel mqttOutputChannel;
public void send(String payload) {
// 发送到默认主题
mqttOutputChannel.send(MessageBuilder.withPayload(payload).build());
}
public void sendToTopic(String topic, String payload) {
// 发送到指定主题(通过header指定)
mqttOutputChannel.send(MessageBuilder.withPayload(payload)
.setHeader("mqtt_topic", topic)
.build());
}
}
```
### 五、注意事项
1. **客户端ID唯一性**:同一Broker下客户端ID不可重复,否则会导致连接被强制断开(可通过`clientId + 随机数`避免)。
2. **QoS选择**:根据业务可靠性要求选择QoS,物联网场景常用QoS 1(平衡可靠性和性能)。
3. **断线重连**:Paho客户端默认支持重连,可通过`MqttConnectOptions.setAutomaticReconnect(true)`增强重连逻辑。
4. **消息持久化**:若需避免消息丢失,可使用`FilePersistence`替代`MemoryPersistence`,并设置`cleanSession=false`。
5. **主题设计**:合理规划主题层级(如`device/{设备ID}/temp`),便于管理和订阅。
通过以上方式,Spring Boot可快速集成MQTT实现消息的发布与订阅,适用于物联网设备通信、分布式系统通知等场景。根据业务复杂度,可选择基础Paho客户端或Spring Integration简化开发。