consumer = (e) -> {
+ for (MqttEventHandler messageHandler : messageHandlers) {
+ if (!messageHandler.next(e, MqttItem.this)) {
+ return;
+ }
+ }
+ };
+
+
+ private IMqttAsyncClient client;
+
+ private final ScheduledExecutorService executorService;
+
+ private ScheduledFuture> future;
+
+ private final AtomicBoolean connecting = new AtomicBoolean(false);
+
+ /**
+ * 构造函数
+ *
+ * @param configName 配置名称
+ * @param config MQTT配置
+ * @param executorService 定时任务执行器
+ */
+ public MqttItem(String configName, Config config, ScheduledExecutorService executorService) {
+ this.configName = configName;
+ this.config = config;
+ this.executorService = executorService;
+ this.clientId = config.getClientId() + "_" + Long.toString(System.currentTimeMillis() - 1735660800000L + new Random().nextInt(1024), 36);
+ connect();
+ schedule();
+ }
+
+ /**
+ * 获取MQTT客户端
+ *
+ * @return MQTT异步客户端
+ * @throws RuntimeException 当客户端正在连接或未连接时抛出
+ * @throws NullPointerException 当客户端未创建时抛出
+ */
+ public IMqttAsyncClient getClient() {
+ if (connecting.get()) {
+ throw new RuntimeException("mqtt客户端连接中:" + configName);
+ }
+
+ if (client == null) {
+ throw new NullPointerException("mqtt客户端未创建:" + configName);
+ }
+ if (!client.isConnected()) {
+ throw new RuntimeException("mqtt客户端未连接:" + configName);
+ }
+ return client;
+ }
+
+
+ /**
+ * 定时检查连接状态并重连
+ */
+ private void schedule() {
+ future = executorService.scheduleWithFixedDelay(() -> {
+ if (client != null && client.isConnected()) {
+ return;
+ }
+ consumer.accept(new MqttReconnectionEvent(configName));
+ connect();
+
+ }, CHECK_GAP, CHECK_GAP, TimeUnit.SECONDS);
+ }
+
+ /**
+ * 连接到MQTT服务器
+ */
+ public void connect() {
+ if (connecting.get()) {
+ return;
+ }
+ connecting.set(true);
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception ignored) {
+
+ }
+ }
+ client = null;
+ try {
+ MqttConnectOptions options = mqttConnectOptions(config);
+ client = new MqttAsyncClient(config.getUrl(), clientId, new MemoryPersistence());
+ client.setCallback(new MqttCallback() {
+ @Override
+ public void connectionLost(Throwable cause) {
+ consumer.accept(new MqttConnectionLostEvent(configName, cause));
+ connecting.set(false);
+ connect();
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ if(log.isDebugEnabled()){
+ log.debug("mqtt message: {}={}",topic,new String(message.getPayload(), StandardCharsets.UTF_8));
+ }
+ consumer.accept(new MqttMessageEvent(configName, topic, message));
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ consumer.accept(new MqttMessageDeliveryEvent(configName, token));
+ }
+ });
+ client.connect(options).waitForCompletion(5000);
+ if (config.getSubscribes() != null && !config.getSubscribes().isEmpty()) {
+ client.subscribe(
+ config.getSubscribes().stream().map(MqttProperties.Topic::getTopic).toArray(String[]::new),
+ config.getSubscribes().stream().mapToInt(MqttProperties.Topic::getQos).toArray()
+ );
+ }
+ consumer.accept(new MqttConnectionSuccessEvent(configName));
+ } catch (Exception e) {
+ consumer.accept(new MqttConnectionExceptionEvent(configName, e));
+ } finally {
+ connecting.set(false);
+ }
+ }
+
+ /**
+ * 销毁MQTT客户端,释放资源
+ */
+ public void destroy() {
+ if (future != null) {
+ future.cancel(true);
+ future = null;
+ }
+
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception ignored) {
+
+ }
+ client = null;
+ }
+
+ }
+
+ /**
+ * 创建MQTT连接选项
+ *
+ * @param properties MQTT配置属性
+ * @return MQTT连接选项
+ */
+ public MqttConnectOptions mqttConnectOptions(MqttProperties.Config properties) {
+ // 连接设置
+ MqttConnectOptions options = new MqttConnectOptions();
+ // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
+ // 设置为true表示每次连接服务器都是以新的身份
+ options.setCleanSession(properties.getCleanSession());
+// options.setCleanSession(true);
+ // 设置连接用户名
+ options.setUserName(properties.getUsername());
+ // 设置连接密码
+ options.setPassword(properties.getPassword().toCharArray());
+ // 设置超时时间,单位为秒
+ options.setConnectionTimeout(properties.getConnectionTimeout());
+ // 设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
+ options.setKeepAliveInterval(properties.getKeepAliveInterval());
+ // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
+ options.setWill(properties.getWillTopic(), properties.getWillMessage().getBytes(), properties.getWillQos(), false);
+ // 设置重连
+// options.setAutomaticReconnect(properties.getAutomaticReconnect());
+ options.setAutomaticReconnect(false);
+// options.setMaxReconnectDelay(properties.getMaxReconnectDelay());
+ options.setCustomWebSocketHeaders(properties.getCustomWebSocketHeaders());
+ options.setSSLProperties(properties.getSslProperties());
+ options.setMqttVersion(properties.getMqttVersion().getVersion());
+ options.setMaxInflight(properties.getMaxInflight());
+
+ options.setServerURIs(properties.getUrls().toArray(new String[0]));
+
+ return options;
+ }
+
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java
new file mode 100644
index 0000000..7449d4a
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java
@@ -0,0 +1,178 @@
+package com.ruoyi.mqtt;
+
+
+import com.ruoyi.mqtt.config.MqttConfig;
+import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * MQTT工具类,提供便捷的MQTT消息发送和客户端获取功能。
+ *
+ * 该工具类封装了{@link MqttFactory}的常用操作,简化了MQTT消息的发送流程。
+ * 支持发送字节数组和字符串类型的消息,并可指定服务质量(QoS)和保留标志。
+ *
+ */
+public class MqttUtil {
+
+
+ private MqttUtil() {
+
+ }
+
+ /**
+ * 获取MQTT工厂实例
+ *
+ * @return MQTT工厂实例
+ */
+ public static MqttFactory getFactory() {
+ return MqttConfig.getMqttFactory();
+ }
+
+ /**
+ * 获取默认配置的MQTT操作项
+ *
+ * @return MQTT操作项
+ */
+ public static MqttItem getItem() {
+ return getFactory().get();
+ }
+
+ /**
+ * 根据配置名称获取MQTT操作项
+ *
+ * @param configName 配置名称
+ * @return MQTT操作项
+ */
+ public static MqttItem getItem(String configName) {
+ return getFactory().get(configName);
+ }
+
+ /**
+ * 获取默认配置的MQTT客户端
+ *
+ * @return MQTT异步客户端
+ */
+ public static IMqttAsyncClient getClient() {
+ return getItem().getClient();
+ }
+
+ /**
+ * 根据配置名称获取MQTT客户端
+ *
+ * @param configName 配置名称
+ * @return MQTT异步客户端
+ */
+ public static IMqttAsyncClient getClient(String configName) {
+ return getItem(configName).getClient();
+ }
+
+
+
+
+ /**
+ * 发送MQTT消息
+ *
+ * @param configName 配置项名称
+ * @param sendName 配置的发送项名称
+ * @param payload 消息负载(字节数组)
+ * @param params 配置的发送项参数,替换发送项中的{0},{1}...
+ * @throws MqttException MQTT异常
+ */
+ public static IMqttDeliveryToken send(String configName, String sendName, byte[] payload, String... params)throws MqttException {
+ return getFactory().send(configName, sendName, payload, params);
+ }
+
+ /**
+ * 发送MQTT消息
+ *
+ * @param configName 配置项名称
+ * @param sendName 配置的发送项名称
+ * @param payload 消息负载(字符串)
+ * @param params 配置的发送项参数,替换发送项中的{0},{1}...
+ * @throws MqttException MQTT异常
+ */
+ public static IMqttDeliveryToken send(String configName, String sendName, String payload, String... params)throws MqttException {
+ return getFactory().send(configName, sendName, payload.getBytes(StandardCharsets.UTF_8), params);
+ }
+
+ /**
+ * 发送MQTT消息(使用默认配置)
+ *
+ * @param sendName 配置的发送项名称
+ * @param payload 消息负载(字节数组)
+ * @param params 配置的发送项参数,替换发送项中的{0},{1}...
+ * @throws MqttException MQTT异常
+ */
+ public static IMqttDeliveryToken send(String sendName, byte[] payload, String... params)throws MqttException {
+ return getFactory().send(sendName, payload, params);
+ }
+
+ /**
+ * 发送MQTT消息(使用默认配置)
+ *
+ * @param sendName 配置的发送项名称
+ * @param payload 消息负载(字符串)
+ * @param params 配置的发送项参数,替换发送项中的{0},{1}...
+ * @throws MqttException MQTT异常
+ */
+ public static IMqttDeliveryToken send(String sendName, String payload, String... params)throws MqttException {
+ return getFactory().send(sendName, payload.getBytes(StandardCharsets.UTF_8), params);
+ }
+
+ /**
+ * 发送MQTT消息
+ *
+ * @param configName 配置项名称
+ * @param topic 主题
+ * @param payload 消息负载(字节数组)
+ * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次
+ * @param retained 保留消息
+ * @throws MqttException MQTT异常
+ */
+ public static IMqttDeliveryToken send(String configName, String topic, byte[] payload, int qos, boolean retained) throws MqttException{
+ return getFactory().send(configName, topic, payload, qos, retained);
+ }
+
+ /**
+ * 发送MQTT消息
+ *
+ * @param configName 配置项名称
+ * @param topic 主题
+ * @param payload 消息负载(字符串)
+ * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次
+ * @param retained 保留消息
+ * @throws MqttException MQTT异常
+ */
+ public static IMqttDeliveryToken send(String configName, String topic, String payload, int qos, boolean retained) throws MqttException{
+ return getFactory().send(configName, topic, payload.getBytes(StandardCharsets.UTF_8), qos, retained);
+ }
+
+ /**
+ * 发送MQTT消息(使用默认配置)
+ *
+ * @param topic 主题
+ * @param payload 消息负载(字节数组)
+ * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次
+ * @param retained 保留消息
+ * @throws MqttException MQTT异常
+ */
+ public static IMqttDeliveryToken send(String topic, byte[] payload, int qos, boolean retained) throws MqttException{
+ return getFactory().send(topic, payload, qos, retained);
+ }
+
+ /**
+ * 发送MQTT消息(使用默认配置)
+ *
+ * @param topic 主题
+ * @param payload 消息负载(字符串)
+ * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次
+ * @param retained 保留消息
+ * @throws MqttException MQTT异常
+ */
+ public static IMqttDeliveryToken send(String topic, String payload, int qos, boolean retained) throws MqttException{
+ return getFactory().send(topic, payload.getBytes(StandardCharsets.UTF_8), qos, retained);
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttVersion.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttVersion.java
new file mode 100644
index 0000000..5d8662a
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttVersion.java
@@ -0,0 +1,37 @@
+package com.ruoyi.mqtt;
+
+/**
+ * MQTT协议版本枚举类
+ *
+ * 定义了支持的MQTT协议版本,包括默认版本、3.1版本和3.1.1版本。
+ *
+ */
+public enum MqttVersion {
+ /** 默认MQTT版本 */
+ MQTT_VERSION_DEFAULT(0),
+ /** MQTT 3.1版本 */
+ MQTT_VERSION_3_1(3),
+ /** MQTT 3.1.1版本 */
+ MQTT_VERSION_3_1_1(4);
+
+ private int version;
+
+ /**
+ * 构造函数
+ *
+ * @param version 版本号
+ */
+ private MqttVersion(int version) {
+ this.version = version;
+ }
+
+
+ /**
+ * 获取版本号
+ *
+ * @return 版本号
+ */
+ public int getVersion() {
+ return version;
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java
new file mode 100644
index 0000000..4f2daf2
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java
@@ -0,0 +1,181 @@
+package com.ruoyi.mqtt.config;
+
+import com.ruoyi.mqtt.MqttEventHandler;
+import com.ruoyi.mqtt.MqttFactory;
+import com.ruoyi.mqtt.MqttItem;
+import com.ruoyi.mqtt.MqttUtil;
+import com.ruoyi.mqtt.event.MqttEvent;
+import com.ruoyi.mqtt.event.MqttSendEvent;
+import com.ruoyi.mqtt.event.MqttSendTopicEvent;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.event.EventListener;
+import org.springframework.core.Ordered;
+import org.springframework.core.PriorityOrdered;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+@Configuration
+@EnableConfigurationProperties(MqttProperties.class)
+@Slf4j
+@RequiredArgsConstructor
+public class MqttConfig implements MqttFactory, MqttEventHandler, PriorityOrdered {
+
+ private final MqttProperties properties;
+ private final ApplicationContext act;
+
+ @Getter
+ private static MqttFactory mqttFactory;
+
+ private ScheduledExecutorService executorService;
+
+ private final Map clients = new ConcurrentHashMap<>();
+
+ @PostConstruct
+ public void init() throws Exception {
+ if (!properties.getEnabled()) {
+ log.info("mqtt模块未激活");
+ return;
+ }
+ log.info("mqtt模块启动中...");
+ try {
+ executorService = act.getBeansOfType(ScheduledExecutorService.class).values().stream().findFirst().get();
+ } catch (Exception e) {
+ executorService = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
+ }
+ properties.getConfigs().forEach((a, b) -> {
+ if (!b.getEnabled()) {
+ return;
+ }
+ put(a, b, this);
+ });
+ mqttFactory = this;
+ }
+
+ @Override
+ public boolean next(MqttEvent event, MqttItem item) {
+ log.debug("mqtt event: {} = {}", event.getConfigName(), event.getClass().getName());
+ act.publishEvent(event);
+ return true;
+ }
+
+ @PreDestroy
+ public void destroy() {
+ clients.forEach((a, b) -> {
+ try {
+ b.destroy();
+ } catch (Exception e) {
+ log.error("mqtt客户端关闭失败:" + a, e);
+ }
+ });
+ try {
+ executorService.shutdown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public MqttItem get(String configName) {
+ return clients.get(configName);
+ }
+
+
+ public void put(String configName, MqttProperties.Config config, MqttEventHandler... handlers) {
+ if (clients.containsKey(configName)) {
+ throw new RuntimeException("配置项已经存在");
+ }
+ MqttItem mqttItem = new MqttItem(configName, config, executorService);
+ if (handlers != null && handlers.length > 0) {
+ Collections.addAll(mqttItem.getMessageHandlers(), handlers);
+ }
+ clients.put(configName, mqttItem);
+ log.debug("mqtt配置项添加:" + configName);
+ }
+
+ public boolean contains(String configName) {
+ return clients.containsKey(configName);
+ }
+
+ public void remove(String configName) {
+ if (contains(configName)) {
+ get(configName).destroy();
+ clients.remove(configName);
+ }
+ }
+
+
+ public IMqttDeliveryToken send(String configName, String topic, byte[] payload, int qos, boolean retained) throws MqttException {
+ MqttItem item = get(configName);
+ if (item == null) {
+ throw new RuntimeException("mqtt客户端未找到:" + configName);
+ }
+ log.debug("mqtt发送成功:configName={},topic={}", configName, topic);
+ return item.getClient().publish(topic, payload, qos, retained);
+ }
+
+ public IMqttDeliveryToken send(String configName, String sendName, byte[] payload, String... params) throws MqttException {
+
+ MqttItem item = get(configName);
+ if (item == null) {
+ throw new RuntimeException("mqtt客户端未找到:" + configName);
+ }
+
+ MqttProperties.Topic topic = item.getConfig().getSends().get(sendName);
+ if (topic == null) {
+ throw new RuntimeException("mqtt配置的主题未找到:" + configName + " = " + sendName);
+ }
+ String topicTempalte = topic.getTopic();
+ if (params != null) {
+ for (int i = 0; i < params.length; i++) {
+ topicTempalte = topicTempalte.replace("{" + i + "}", params[i]);
+ }
+ }
+
+ log.debug("mqtt发送成功:configName={},sendName={},topic={}", configName, sendName, topicTempalte);
+ return item.getClient().publish(topicTempalte, payload, topic.getQos(), topic.getRetained());
+ }
+
+
+ @EventListener
+ public void listener(MqttSendEvent event) throws MqttException {
+ send(event.getConfigName(), event.getSendName(), event.getPayload(), event.getParams());
+ }
+
+ @EventListener
+ public void listener(MqttSendTopicEvent event) throws MqttException {
+ send(event.getConfigName(), event.getTopic(), event.getPayload(), event.getQos(), event.isRetained());
+ }
+
+
+// @Scheduled(cron = "*/5 * * * * ?")
+// public void test() {
+// String s = Long.toString(System.currentTimeMillis(), 36);
+// try {
+//// act.publishEvent(new MqttSendEvent("test",s.getBytes(StandardCharsets.UTF_8),s));
+// MqttUtil.send("test", s, new String[]{s});
+// log.info("test success:{}", s);
+// } catch (Exception e) {
+// log.info("test error:" + s, e);
+// }
+// }
+
+ @Override
+ public int getOrder() {
+ return Ordered.HIGHEST_PRECEDENCE; // 最高优先级
+ }
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java
new file mode 100644
index 0000000..5e8494d
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java
@@ -0,0 +1,194 @@
+package com.ruoyi.mqtt.config;
+
+
+import com.ruoyi.mqtt.MqttVersion;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+import java.util.*;
+
+/**
+ * MQTT配置属性类
+ *
+ * 该类定义了MQTT客户端的配置属性,包括是否启用、默认配置、以及各个MQTT客户端的详细配置。
+ *
+ */
+@Data
+@ConfigurationProperties(value = MqttProperties.PREFIX, ignoreInvalidFields = true, ignoreUnknownFields = true)
+public class MqttProperties {
+
+ public final static String DEFAULT = "default";
+
+ /**
+ * 配置前缀
+ */
+ public static final String PREFIX = "spring.mqtt";
+
+ /**
+ * 是否启用MQTT功能
+ * 默认值: false
+ */
+ private Boolean enabled = false;
+
+ /**
+ * 默认配置名称
+ * 默认值: "default"
+ */
+ private String defaultConfig = DEFAULT;
+
+ /**
+ * MQTT客户端配置映射,key为配置名称,value为具体的配置项
+ */
+ private Map configs = new HashMap<>();
+
+
+ /**
+ * MQTT主题配置类
+ *
+ * 该类定义了MQTT主题的相关配置,包括主题名称、服务质量(QoS)和是否保留消息。
+ *
+ */
+ @Data
+ public static class Topic {
+
+ /**
+ * 订阅主题可以使用 单级通配符 "+" 和多级通配符 "#"
+ * 发送主题可以使用如:{0},{1},{2},发送时使用params数组替换
+ */
+ private String topic;
+
+ /**
+ * 服务质量(QoS)
+ * 默认值: 0
+ */
+ private Integer qos = 0;
+
+ /**
+ * 是否保留消息,订阅主题时无需配置
+ * 默认值: false
+ */
+ private Boolean retained = false;
+ }
+
+ /**
+ * MQTT客户端配置类
+ *
+ * 该类定义了单个MQTT客户端的详细配置,包括连接信息、认证信息、心跳设置等。
+ *
+ */
+ @Data
+ public static class Config {
+
+ /**
+ * 是否启用该MQTT客户端配置
+ * 默认值: true
+ */
+ private Boolean enabled = true;
+
+ /**
+ * 客户端编号(唯一)
+ * 默认值: 基于系统时间生成的36进制字符串
+ */
+ private String clientId = Long.toString(System.currentTimeMillis(), 36);
+
+ /**
+ * 服务器URL
+ * 默认值: tcp://127.0.0.1:1883
+ */
+ private String url = "tcp://127.0.0.1:1883";
+
+ /**
+ * 服务器集群URL列表(可选)
+ */
+ private List urls = new ArrayList<>();
+
+ /**
+ * 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
+ * 设置为true表示每次连接服务器都是以新的身份
+ * 默认值: true
+ */
+ private Boolean cleanSession = true;
+
+ /**
+ * 用户名
+ * 默认值: admin
+ */
+ private String username = "admin";
+
+ /**
+ * 密码
+ * 默认值: 123456
+ */
+ private String password = "123456";
+
+ /**
+ * 连接超时时间,单位秒
+ * 默认值: 5
+ */
+ private Integer connectionTimeout = 5;
+
+ /**
+ * 心跳时间 单位为秒,表示服务器每隔60秒的时间向客户端发送心跳判断客户端是否在线
+ * 默认值: 60
+ */
+ private Integer keepAliveInterval = 60;
+
+ /**
+ * 遗嘱主题
+ * 默认值: will/topic
+ */
+ private String willTopic = "will/topic";
+
+ /**
+ * 遗嘱消息
+ * 默认值: offline
+ */
+ private String willMessage = "offline";
+
+ /**
+ * 遗嘱消息的服务质量(QoS)
+ * 默认值: 0
+ */
+ private Integer willQos = 0;
+
+ /**
+ * 订阅主题列表
+ */
+ private List subscribes = new ArrayList<>();
+
+ /**
+ * 是否自动重连
+ * 默认值: true (注释掉的配置)
+ */
+// private Boolean automaticReconnect = true;
+
+// private Integer maxReconnectDelay = Integer.MAX_VALUE;
+
+ /**
+ * 自定义WebSocket头部信息
+ */
+ private Properties customWebSocketHeaders = new Properties();
+
+ /**
+ * SSL属性配置
+ */
+ private Properties sslProperties = new Properties();
+
+ /**
+ * 发送主题映射,key为发送名称,value为具体的主题配置
+ */
+ private Map sends = new HashMap<>();
+
+ /**
+ * MQTT协议版本
+ * 默认值: MQTT_VERSION_DEFAULT
+ */
+ private MqttVersion mqttVersion = MqttVersion.MQTT_VERSION_DEFAULT;
+
+ /**
+ * 最大未确认消息数量
+ * 默认值: 10
+ */
+ private Integer maxInflight = 10;
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionExceptionEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionExceptionEvent.java
new file mode 100644
index 0000000..ab07694
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionExceptionEvent.java
@@ -0,0 +1,21 @@
+package com.ruoyi.mqtt.event;
+
+import lombok.Getter;
+
+/**
+ * MQTT连接异常
+ */
+public final class MqttConnectionExceptionEvent extends MqttEvent{
+
+ @Getter
+ private Throwable cause;
+
+ public MqttConnectionExceptionEvent(Throwable cause) {
+ this.cause = cause;
+ }
+
+ public MqttConnectionExceptionEvent(String configName, Throwable cause) {
+ super(configName);
+ this.cause = cause;
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionLostEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionLostEvent.java
new file mode 100644
index 0000000..0bbe2dc
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionLostEvent.java
@@ -0,0 +1,21 @@
+package com.ruoyi.mqtt.event;
+
+import lombok.Getter;
+
+/**
+ * MQTT连接丢失
+ */
+public final class MqttConnectionLostEvent extends MqttEvent{
+
+ @Getter
+ private Throwable cause;
+
+ public MqttConnectionLostEvent(Throwable cause) {
+ this.cause = cause;
+ }
+
+ public MqttConnectionLostEvent(String configName,Throwable cause) {
+ super(configName);
+ this.cause = cause;
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionSuccessEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionSuccessEvent.java
new file mode 100644
index 0000000..45882f4
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttConnectionSuccessEvent.java
@@ -0,0 +1,14 @@
+package com.ruoyi.mqtt.event;
+
+/**
+ * MQTT连接成功
+ */
+public final class MqttConnectionSuccessEvent extends MqttEvent{
+
+ public MqttConnectionSuccessEvent() {
+ }
+
+ public MqttConnectionSuccessEvent(String configName) {
+ super(configName);
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttEvent.java
new file mode 100644
index 0000000..1fb8586
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttEvent.java
@@ -0,0 +1,26 @@
+package com.ruoyi.mqtt.event;
+
+import com.ruoyi.mqtt.config.MqttProperties;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * Mqtt事件
+ */
+public abstract class MqttEvent extends ApplicationEvent {
+ @Getter
+ private String configName = MqttProperties.DEFAULT;
+
+ public MqttEvent() {
+ this(MqttProperties.DEFAULT);
+ }
+
+ public MqttEvent(String configName) {
+ super(configName);
+ }
+
+ @Override
+ public String getSource() {
+ return (String)super.getSource();
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttMessageDeliveryEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttMessageDeliveryEvent.java
new file mode 100644
index 0000000..390358f
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttMessageDeliveryEvent.java
@@ -0,0 +1,24 @@
+package com.ruoyi.mqtt.event;
+
+import lombok.Getter;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * 接受的消息
+ */
+public final class MqttMessageDeliveryEvent extends MqttEvent {
+
+ @Getter
+ private IMqttDeliveryToken token;
+
+
+ public MqttMessageDeliveryEvent(IMqttDeliveryToken token) {
+ this.token = token;
+ }
+
+ public MqttMessageDeliveryEvent(String configName, IMqttDeliveryToken token) {
+ super(configName);
+ this.token = token;
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttMessageEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttMessageEvent.java
new file mode 100644
index 0000000..82cca86
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttMessageEvent.java
@@ -0,0 +1,27 @@
+package com.ruoyi.mqtt.event;
+
+import lombok.Getter;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * 接受的消息
+ */
+public final class MqttMessageEvent extends MqttEvent {
+
+ @Getter
+ private String topic;
+
+ @Getter
+ private MqttMessage message;
+
+ public MqttMessageEvent(String topic, MqttMessage message) {
+ this.topic = topic;
+ this.message = message;
+ }
+
+ public MqttMessageEvent(String configName, String topic, MqttMessage message) {
+ super(configName);
+ this.topic = topic;
+ this.message = message;
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttReconnectionEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttReconnectionEvent.java
new file mode 100644
index 0000000..6102d8c
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttReconnectionEvent.java
@@ -0,0 +1,15 @@
+
+package com.ruoyi.mqtt.event;
+
+/**
+ * MQTT重连开始
+ */
+public final class MqttReconnectionEvent extends MqttEvent{
+
+ public MqttReconnectionEvent() {
+ }
+
+ public MqttReconnectionEvent(String configName) {
+ super(configName);
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttSendEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttSendEvent.java
new file mode 100644
index 0000000..cdd9443
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttSendEvent.java
@@ -0,0 +1,53 @@
+package com.ruoyi.mqtt.event;
+
+import com.ruoyi.mqtt.config.MqttProperties;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+import java.util.List;
+
+/**
+ * 发送的
+ */
+public class MqttSendEvent extends ApplicationEvent {
+
+ @Getter
+ private String configName;
+
+ @Getter
+ private String sendName;
+
+ @Getter
+ private byte[] payload;
+
+ @Getter
+ private String[] params;
+
+
+ public MqttSendEvent(String configName, String sendName, byte[] payload, String... params) {
+ super(configName);
+ this.configName = configName;
+ this.sendName = sendName;
+ this.payload = payload;
+ this.params = params;
+ }
+
+ public MqttSendEvent(String configName, String sendName, byte[] payload, List params) {
+ this(configName, sendName, payload, params.toArray(new String[0]));
+ }
+
+ public MqttSendEvent(String sendName, byte[] payload, List params) {
+ this(MqttProperties.DEFAULT, sendName, payload, params.toArray(new String[0]));
+ }
+
+ public MqttSendEvent(String sendName, byte[] payload, String... params) {
+ this(MqttProperties.DEFAULT, sendName, payload, params);
+ }
+
+
+ @Override
+ public String getSource() {
+ return (String) super.getSource();
+ }
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttSendTopicEvent.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttSendTopicEvent.java
new file mode 100644
index 0000000..7783088
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/event/MqttSendTopicEvent.java
@@ -0,0 +1,49 @@
+package com.ruoyi.mqtt.event;
+
+import com.ruoyi.mqtt.config.MqttProperties;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+import java.util.List;
+
+/**
+ * 发送的
+ */
+public class MqttSendTopicEvent extends ApplicationEvent {
+
+ @Getter
+ private String configName;
+
+ @Getter
+ private String topic;
+
+ @Getter
+ private byte[] payload;
+
+ @Getter
+ private int qos;
+
+ @Getter
+ private boolean retained;
+
+
+ public MqttSendTopicEvent(String configName, String topic, byte[] payload, int qos, boolean retained) {
+ super(configName);
+ this.configName = configName;
+ this.topic = topic;
+ this.payload = payload;
+ this.qos = qos;
+ this.retained = retained;
+ }
+
+ public MqttSendTopicEvent(String topic, byte[] payload, int qos, boolean retained) {
+ this(MqttProperties.DEFAULT, topic, payload, qos, retained);
+ }
+
+
+ @Override
+ public String getSource() {
+ return (String) super.getSource();
+ }
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/Const.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/Const.java
new file mode 100644
index 0000000..9307169
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/Const.java
@@ -0,0 +1,39 @@
+package com.ruoyi.mqtt.rmi;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+/**
+ * MQTT RMI 常量接口
+ *
+ * 该接口定义了 MQTT RMI 相关的常量。
+ */
+public interface Const {
+ String REQUEST_TOPICE = "/request";
+ String RESPONSE_TOPICE = "/response";
+
+ /**
+ * 将异常的栈跟踪信息转换为字符串
+ *
+ * @param e 异常对象
+ * @return 包含完整栈跟踪信息的字符串,如果e为null则返回"null"
+ */
+ public static String getStackTraceAsString(Throwable e) {
+ // 处理null情况
+ if (e == null) {
+ return null;
+ }
+
+ // 使用StringWriter和PrintWriter来捕获栈跟踪信息
+ try (
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ ) {
+ e.printStackTrace(pw);
+ return sw.toString();
+ } catch (IOException ex) {
+ return null;
+ }
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiRequest.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiRequest.java
new file mode 100644
index 0000000..d29907b
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiRequest.java
@@ -0,0 +1,26 @@
+package com.ruoyi.mqtt.rmi;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+/**
+ * MQTT RMI 请求类
+ *
+ * 该类用于封装 MQTT RMI 调用的请求数据。
+ */
+public class MqttRmiRequest {
+
+ private String requestId;
+ private String name;
+ private String method;
+ private List args;
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiRequestSender.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiRequestSender.java
new file mode 100644
index 0000000..3139fd9
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiRequestSender.java
@@ -0,0 +1,10 @@
+package com.ruoyi.mqtt.rmi;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public interface MqttRmiRequestSender {
+
+ ObjectMapper getMapper();
+
+ MqttRmiResponse request(MqttRmiRequest request, long timeout);
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiResponse.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiResponse.java
new file mode 100644
index 0000000..aa791a3
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiResponse.java
@@ -0,0 +1,29 @@
+package com.ruoyi.mqtt.rmi;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+/**
+ * MQTT RMI 响应类
+ *
+ * 该类用于封装 MQTT RMI 调用的响应数据。
+ */
+public class MqttRmiResponse {
+
+ private String requestId;
+ private String name;
+ private String method;
+ private Boolean ok;
+ private JsonNode body;
+ private String error;
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiUtil.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiUtil.java
new file mode 100644
index 0000000..d2c6b69
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/MqttRmiUtil.java
@@ -0,0 +1,23 @@
+package com.ruoyi.mqtt.rmi;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ruoyi.mqtt.rmi.impl.MqttRmiConsumerImpl;
+
+public class MqttRmiUtil {
+
+ private MqttRmiUtil() {
+ }
+
+ public static MqttRmiResponse request(MqttRmiRequest request, long timeout) {
+ return MqttRmiConsumerImpl.getSender().request(request, timeout);
+ }
+
+ public static MqttRmiResponse request(MqttRmiRequest request) {
+ return MqttRmiConsumerImpl.getSender().request(request, 0);
+ }
+
+ public static ObjectMapper getMapper() {
+ return MqttRmiConsumerImpl.getSender().getMapper();
+ }
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiEnabled.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiEnabled.java
new file mode 100644
index 0000000..3f50dae
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiEnabled.java
@@ -0,0 +1,17 @@
+package com.ruoyi.mqtt.rmi.annotation;
+
+import com.ruoyi.mqtt.rmi.config.MqttRmiConfig;
+import org.springframework.context.annotation.Import;
+
+import java.lang.annotation.*;
+
+/**
+ * 非ruoyi项目需要使用本注解
+ */
+@Import(MqttRmiConfig.class)
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Inherited
+public @interface MqttRmiEnabled {
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiMethod.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiMethod.java
new file mode 100644
index 0000000..c06c40d
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiMethod.java
@@ -0,0 +1,32 @@
+package com.ruoyi.mqtt.rmi.annotation;
+
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+@Component
+/**
+ * MQTT RMI 方法注解
+ *
+ * 该注解用于标记一个方法为 MQTT RMI 可调用方法。
+ */
+public @interface MqttRmiMethod {
+
+ /**
+ * 远程调用的方法不支持方法重载
+ * 方法名,默认为本方法名
+ * @return
+ */
+ String value() default "";
+
+ /**
+ * 超时,默认:0 表示使用配置的默认值
+ * @return
+ */
+ long timeout() default 0;
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiProvider.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiProvider.java
new file mode 100644
index 0000000..41c6ad5
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiProvider.java
@@ -0,0 +1,23 @@
+package com.ruoyi.mqtt.rmi.annotation;
+
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+/**
+ * MQTT RMI 提供者注解
+ *
+ * 该注解用于标记一个类为 MQTT RMI 服务提供者。
+ */
+public @interface MqttRmiProvider {
+
+ /**
+ * 默认为类名
+ * @return
+ */
+ String value() default "";
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiScan.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiScan.java
new file mode 100644
index 0000000..4f08754
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiScan.java
@@ -0,0 +1,16 @@
+package com.ruoyi.mqtt.rmi.annotation;
+
+import java.lang.annotation.*;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Inherited
+/**
+ * MQTT RMI 扫描注解
+ *
+ * 该注解用于标记需要扫描的包路径,以便发现和注册 MQTT RMI 服务接口。
+ */
+public @interface MqttRmiScan {
+ String[] value() default {};
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiService.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiService.java
new file mode 100644
index 0000000..607bd67
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/annotation/MqttRmiService.java
@@ -0,0 +1,23 @@
+package com.ruoyi.mqtt.rmi.annotation;
+
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+/**
+ * MQTT RMI 服务注解
+ *
+ * 该注解用于标记一个接口为 MQTT RMI 服务接口。
+ */
+public @interface MqttRmiService {
+
+ /**
+ * 默认为类名
+ * @return
+ */
+ String value() default "";
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/config/MqttRmiConfig.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/config/MqttRmiConfig.java
new file mode 100644
index 0000000..ceae503
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/config/MqttRmiConfig.java
@@ -0,0 +1,57 @@
+package com.ruoyi.mqtt.rmi.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ruoyi.mqtt.MqttFactory;
+import com.ruoyi.mqtt.rmi.impl.MqttRmiConsumerBeanDefinitionRegistryPostProcessor;
+import com.ruoyi.mqtt.rmi.impl.MqttRmiConsumerImpl;
+import com.ruoyi.mqtt.rmi.impl.MqttRmiProviderImpl;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
+import org.springframework.core.Ordered;
+import org.springframework.core.PriorityOrdered;
+
+@Configuration
+@EnableConfigurationProperties(MqttRmiProperties.class)
+/**
+ * MQTT RMI 配置类
+ *
+ * 该类负责配置和初始化 MQTT RMI 相关的 Bean。
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class MqttRmiConfig implements PriorityOrdered {
+
+ MqttRmiProperties properties;
+
+ @Bean
+// @ConditionalOnProperty(prefix = MqttRmiProperties.PREFIX,name = "provider",havingValue = "true",matchIfMissing = false)
+ public MqttRmiProviderImpl mqttRmiProviderImpl(MqttRmiProperties properties, MqttFactory factory, ObjectMapper mapper) {
+ return new MqttRmiProviderImpl(properties, factory, mapper);
+ }
+
+ @Bean
+// @ConditionalOnProperty(prefix = MqttRmiProperties.PREFIX,name = "consumer",havingValue = "true",matchIfMissing = false)
+ public MqttRmiConsumerImpl mqttRmiConsumer(MqttRmiProperties properties, MqttFactory factory, ObjectMapper mapper, ApplicationContext act) {
+ return new MqttRmiConsumerImpl(properties, factory, mapper, act);
+ }
+
+ @Bean
+ @ConditionalOnProperty(prefix = MqttRmiProperties.PREFIX,name = "consumer",havingValue = "true",matchIfMissing = false)
+ public MqttRmiConsumerBeanDefinitionRegistryPostProcessor mqttRmiConsumerBeanDefinitionRegistryPostProcessor() {
+ return new MqttRmiConsumerBeanDefinitionRegistryPostProcessor();
+ }
+
+
+ @Override
+ public int getOrder() {
+ return Ordered.HIGHEST_PRECEDENCE; // 最高优先级
+ }
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/config/MqttRmiProperties.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/config/MqttRmiProperties.java
new file mode 100644
index 0000000..c29e2e7
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/config/MqttRmiProperties.java
@@ -0,0 +1,74 @@
+package com.ruoyi.mqtt.rmi.config;
+
+
+import com.ruoyi.mqtt.config.MqttProperties;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * MQTT配置属性类
+ *
+ * 该类定义了MQTT客户端的配置属性,包括是否启用、默认配置、以及各个MQTT客户端的详细配置。
+ *
+ */
+@Data
+@ConfigurationProperties(value = MqttRmiProperties.PREFIX, ignoreInvalidFields = true, ignoreUnknownFields = true)
+/**
+ * MQTT RMI 配置属性类
+ *
+ * 该类用于配置 MQTT RMI 相关的属性。
+ */
+public class MqttRmiProperties {
+
+
+ /**
+ * 配置前缀
+ */
+ public static final String PREFIX = "spring.mqtt.rmi";
+
+ /**
+ * 是否启用MQTT的远程方法调用功能
+ * 默认值: false
+ */
+ private Boolean enabled = false;
+
+ /**
+ * 是否是提供者,负责接口的实现, 默认: false
+ */
+ private Boolean provider = false;
+
+ /**
+ * 是否是消费者,负责接口的定义, 默认: false
+ */
+ private Boolean consumer = false;
+
+ /**
+ * 默认配置名称
+ * 默认值: "default"
+ */
+ private String configName = MqttProperties.DEFAULT;
+
+ /**
+ * 用于rmi的发送和订阅主题,不能含变量和通配符
+ * 请求主题会加后缀: /request
+ * 响应主题会加后缀: /response
+ */
+ private MqttProperties.Topic topic;
+
+ /**
+ * 发送请求时,主题添加的前缀
+ */
+ private String topicRequestPrefix="";
+
+ /**
+ * 没有发现方法是否发送错误, 默认: true
+ */
+ private Boolean notFindMethodSendError = true;
+
+ /**
+ * 远程方法调用默认超时(毫秒),默认: 500
+ */
+ private Long timeout = 500L;
+
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiErrorException.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiErrorException.java
new file mode 100644
index 0000000..273b104
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiErrorException.java
@@ -0,0 +1,20 @@
+package com.ruoyi.mqtt.rmi.exception;
+
+import lombok.Getter;
+
+/**
+ * MQTT RMI 错误异常类
+ *
+ * 该类表示 MQTT RMI 调用过程中发生错误的异常。
+ */
+public class MqttRmiErrorException extends MqttRmiException {
+
+ public MqttRmiErrorException(String requestId) {
+ super(requestId);
+ }
+
+ public MqttRmiErrorException(String requestId, String message) {
+ super(requestId, message);
+ }
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiException.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiException.java
new file mode 100644
index 0000000..89ee6e4
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiException.java
@@ -0,0 +1,25 @@
+package com.ruoyi.mqtt.rmi.exception;
+
+import lombok.Getter;
+
+/**
+ * MQTT RMI 异常类
+ *
+ * 该类是 MQTT RMI 相关异常的基类,包含请求 ID 信息。
+ */
+public class MqttRmiException extends RuntimeException {
+
+ @Getter
+ private String requestId;
+
+ public MqttRmiException(String requestId) {
+ super();
+ this.requestId = requestId;
+ }
+
+ public MqttRmiException(String requestId,String message) {
+ super(message);
+ this.requestId = requestId;
+ }
+
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiTimeoutException.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiTimeoutException.java
new file mode 100644
index 0000000..f00042b
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/exception/MqttRmiTimeoutException.java
@@ -0,0 +1,17 @@
+package com.ruoyi.mqtt.rmi.exception;
+
+/**
+ * MQTT RMI 超时异常类
+ *
+ * 该类表示 MQTT RMI 调用超时的异常。
+ */
+public class MqttRmiTimeoutException extends MqttRmiException {
+
+ public MqttRmiTimeoutException(String requestId) {
+ super(requestId);
+ }
+
+ public MqttRmiTimeoutException(String requestId, String message) {
+ super(requestId, message);
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerBeanDefinitionRegistryPostProcessor.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerBeanDefinitionRegistryPostProcessor.java
new file mode 100644
index 0000000..279e3f2
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerBeanDefinitionRegistryPostProcessor.java
@@ -0,0 +1,67 @@
+package com.ruoyi.mqtt.rmi.impl;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.ClassUtil;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ruoyi.mqtt.rmi.MqttRmiRequestSender;
+import com.ruoyi.mqtt.rmi.annotation.MqttRmiScan;
+import com.ruoyi.mqtt.rmi.annotation.MqttRmiService;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.beans.factory.support.BeanDefinitionRegistry;
+import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+/**
+ * MQTT RMI 消费者 Bean 定义注册后置处理器
+ *
+ * 该类负责扫描 @MqttRmiScan 注解,并为带有 @MqttRmiService 注解的接口创建代理 Bean。
+ */
+public class MqttRmiConsumerBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {
+
+
+ @Override
+ public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
+
+ }
+
+ @Override
+ public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
+ Map scanBean = configurableListableBeanFactory.getBeansWithAnnotation(MqttRmiScan.class);
+ List basePackages = new ArrayList<>();
+
+ for (Object obj : scanBean.values()) {
+// log.info("{}={}", obj.getClass(), obj.getClass().isAnnotationPresent(MqttRmiScan.class));
+ MqttRmiScan mqttRmiScan = obj.getClass().getAnnotation(MqttRmiScan.class);
+ String[] temp = new String[]{obj.getClass().getPackage().getName()};
+ if (mqttRmiScan.value().length > 0) {
+ temp = mqttRmiScan.value();
+ }
+ Collections.addAll(basePackages, temp);
+ }
+ log.info("basePackages:{}", basePackages);
+
+
+ Set> set = new HashSet<>();
+ for (String basePackage : basePackages) {
+ set.addAll(ClassUtil.scanPackageByAnnotation(basePackage, MqttRmiService.class).stream().filter(Class::isInterface).collect(Collectors.toSet()));
+
+ }
+ if (CollUtil.isEmpty(set)) {
+ return;
+ }
+
+ log.info("mqttRmiConsumerProxy:{}", set);
+ Object bean = new MqttRmiConsumerProxy(set.toArray(new Class[0])).createProxy();
+ configurableListableBeanFactory.autowireBean(bean);
+ configurableListableBeanFactory.registerSingleton("mqttRmiConsumerProxy", bean);
+
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerImpl.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerImpl.java
new file mode 100644
index 0000000..d048093
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerImpl.java
@@ -0,0 +1,158 @@
+package com.ruoyi.mqtt.rmi.impl;
+
+
+import cn.hutool.cache.CacheUtil;
+import cn.hutool.cache.impl.TimedCache;
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.core.util.ObjUtil;
+import cn.hutool.core.util.StrUtil;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ruoyi.mqtt.MqttFactory;
+import com.ruoyi.mqtt.MqttItem;
+import com.ruoyi.mqtt.rmi.MqttRmiRequest;
+import com.ruoyi.mqtt.rmi.MqttRmiRequestSender;
+import com.ruoyi.mqtt.rmi.MqttRmiResponse;
+import com.ruoyi.mqtt.rmi.config.MqttRmiProperties;
+import com.ruoyi.mqtt.rmi.exception.MqttRmiErrorException;
+import com.ruoyi.mqtt.rmi.exception.MqttRmiTimeoutException;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.context.ApplicationContext;
+import org.springframework.core.Ordered;
+import org.springframework.core.PriorityOrdered;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+
+import static com.ruoyi.mqtt.rmi.Const.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+/**
+ * MQTT RMI 消费者实现类
+ *
+ * 该类负责发送 MQTT RMI 请求,并监听和处理 MQTT 响应消息。
+ */
+public class MqttRmiConsumerImpl implements IMqttMessageListener, MqttRmiRequestSender, PriorityOrdered {
+
+ private final MqttRmiProperties properties;
+ private final MqttFactory factory;
+ private final ObjectMapper mapper;
+ private final ApplicationContext act;
+
+
+ private MqttItem mqttItem;
+ private TimedCache> requestCache;
+
+ @Getter
+ private static MqttRmiRequestSender sender;
+
+
+ @PostConstruct
+ public void init() throws Exception {
+ sender = this;
+ if (!properties.getConsumer() || !properties.getEnabled()) {
+ log.info("MqttRmiConsumer未激活");
+ return;
+ }
+ log.info("MqttRmiConsumer已激活");
+
+ mqttItem = factory.get(properties.getConfigName());
+ mqttItem.getClient().subscribe(properties.getTopic().getTopic() + RESPONSE_TOPICE, properties.getTopic().getQos(), this).waitForCompletion();
+
+ requestCache = CacheUtil.newTimedCache(properties.getTimeout());
+// requestCache = new TimedCache<>(properties.getTimeout());
+ requestCache.setListener((a, b) -> {
+ if (!b.isDone()) {
+ b.complete(null);
+ }
+ });
+ requestCache.schedulePrune(properties.getTimeout() / 10);
+ }
+
+ @Override
+ public ObjectMapper getMapper() {
+ return mapper;
+ }
+
+ public MqttRmiResponse request(MqttRmiRequest request, long timeout) {
+ if (StrUtil.isBlank(request.getRequestId())) {
+ request.setRequestId(IdUtil.nanoId());
+ }
+ CompletableFuture future = new CompletableFuture<>();
+ if (timeout > 0) {
+ requestCache.put(request.getRequestId(), future, timeout);
+ } else {
+ requestCache.put(request.getRequestId(), future);
+ }
+ send(request);
+ MqttRmiResponse ret = future.join();
+ if (ObjUtil.isNull(ret)) {
+// return MqttRmiResponse.builder()
+// .requestId(request.getRequestId())
+// .name(request.getRequestId())
+// .method(request.getRequestId())
+// .ok(false)
+// .error("timeout")
+// .build();
+ throw new MqttRmiTimeoutException(request.getRequestId());
+ }
+ requestCache.remove(request.getRequestId());
+ if (!ret.getOk()) {
+ throw new MqttRmiErrorException(ret.getRequestId(), ret.getError());
+ }
+ return ret;
+ }
+
+
+ public void send(MqttRmiRequest request) {
+ try {
+ mqttItem.getClient().publish(
+ properties.getTopicRequestPrefix() + properties.getTopic().getTopic() + REQUEST_TOPICE,
+ mapper.writeValueAsString(request).getBytes(StandardCharsets.UTF_8),
+ properties.getTopic().getQos(),
+ properties.getTopic().getRetained());
+ } catch (Exception ex) {
+ log.warn("MqttRmiRequest send error", ex);
+ }
+ }
+
+
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ String json = new String(message.getPayload(), StandardCharsets.UTF_8);
+ MqttRmiResponse response = null;
+ try {
+ response = mapper.readValue(json, MqttRmiResponse.class);
+ } catch (JsonProcessingException e) {
+ log.debug("MqttRmiConsumer json parse error: {}",
+ json);
+ return;
+ }
+ String requestId = response.getRequestId();
+ if (StrUtil.isBlank(requestId) || !requestCache.containsKey(requestId)) {
+ log.debug("MqttRmiConsumer ignore requestId: {} = {}",
+ requestId, json);
+ return;
+ }
+ try {
+ requestCache.get(requestId).complete(response);
+ } catch (Exception e) {
+ log.debug("MqttRmiConsumer complete error requestId: {} ",
+ requestId);
+ }
+
+ }
+ @Override
+ public int getOrder() {
+ return Ordered.HIGHEST_PRECEDENCE; // 最高优先级
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerProxy.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerProxy.java
new file mode 100644
index 0000000..697783f
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiConsumerProxy.java
@@ -0,0 +1,93 @@
+package com.ruoyi.mqtt.rmi.impl;
+
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.StrUtil;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ruoyi.mqtt.rmi.MqttRmiRequest;
+import com.ruoyi.mqtt.rmi.MqttRmiRequestSender;
+import com.ruoyi.mqtt.rmi.MqttRmiResponse;
+import com.ruoyi.mqtt.rmi.MqttRmiUtil;
+import com.ruoyi.mqtt.rmi.annotation.MqttRmiMethod;
+import com.ruoyi.mqtt.rmi.annotation.MqttRmiService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.List;
+
+@RequiredArgsConstructor
+@Slf4j
+/**
+ * MQTT RMI 消费者代理类
+ *
+ * 该类负责创建 MQTT RMI 服务的代理实例,并处理远程方法调用的逻辑。
+ */
+public class MqttRmiConsumerProxy implements InvocationHandler {
+
+ // 目标接口的Class对象
+ private final Class>[] targetInterfaces;
+
+
+ /**
+ * 创建代理实例
+ */
+ @SuppressWarnings("unchecked")
+ public T createProxy() {
+ return (T) Proxy.newProxyInstance(
+ Thread.currentThread().getContextClassLoader(),
+ targetInterfaces,
+ this
+ );
+ }
+
+ /**
+ * 代理方法逻辑
+ */
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ // 方法调用前的逻辑
+
+ Class> clazz = method.getDeclaringClass();
+ String name = clazz.getSimpleName();
+ if (clazz.isAnnotationPresent(MqttRmiService.class)) {
+ MqttRmiService mqttRmiService = clazz.getAnnotation(MqttRmiService.class);
+ if (StrUtil.isNotBlank(mqttRmiService.value())) {
+ name = mqttRmiService.value();
+ }
+ }
+
+ String methodName = method.getName();
+ long timeout = 0;
+ if (method.isAnnotationPresent(MqttRmiMethod.class)) {
+ MqttRmiMethod mqttRmiMethod = method.getAnnotation(MqttRmiMethod.class);
+ if (StrUtil.isNotBlank(mqttRmiMethod.value())) {
+ methodName = mqttRmiMethod.value();
+ }
+ timeout = mqttRmiMethod.timeout();
+ }
+ List list = new ArrayList<>();
+ if (ArrayUtil.isNotEmpty(args)) {
+ for (Object obj : args) {
+ list.add(MqttRmiUtil.getMapper().valueToTree(obj));
+ }
+ }
+
+ log.debug("MqttRmiConsumerProxy remote invoke: name={},method={},timeout={},args={}", name, methodName, timeout, list);
+ MqttRmiRequest request = MqttRmiRequest.builder()
+ .name(name)
+ .method(methodName)
+ .args(list)
+ .build();
+ MqttRmiResponse response = MqttRmiUtil.request(request, timeout);
+
+ Class> retType = method.getReturnType();
+ if (retType == void.class) {
+ return null;
+ }
+ return MqttRmiUtil.getMapper().convertValue(response.getBody(),retType);
+ }
+}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiProviderBeanPostProcessor.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiProviderBeanPostProcessor.java
new file mode 100644
index 0000000..4b446c5
--- /dev/null
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/rmi/impl/MqttRmiProviderBeanPostProcessor.java
@@ -0,0 +1,86 @@
+package com.ruoyi.mqtt.rmi.impl;
+
+
+import com.ruoyi.mqtt.rmi.annotation.MqttRmiMethod;
+import com.ruoyi.mqtt.rmi.annotation.MqttRmiProvider;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.Ordered;
+import org.springframework.core.PriorityOrdered;
+
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+@Getter
+@Configuration
+@RequiredArgsConstructor
+/**
+ * MQTT RMI 提供者 Bean 后置处理器
+ *
+ * 该类负责扫描并注册带有 @MqttRmiProvider 注解的 Bean,
+ * 以便在 MQTT 消息到达时能够调用相应的方法。
+ */
+public class MqttRmiProviderBeanPostProcessor implements BeanPostProcessor, PriorityOrdered {
+
+
+ public final static String SP = "\t";
+
+ private final static Map methodMap = new HashMap<>();
+ private final static Map beanMap = new HashMap<>();
+
+ public final static Collection getMethods() {
+ return methodMap.values();
+ }
+
+ public final static Method getMethod(String name, String method) {
+ return methodMap.get(name + SP + method);
+ }
+
+ public final static Collection