From 94f7d674524ee3f05894dc8e2743c20eb5651f60 Mon Sep 17 00:00:00 2001
From: jlzhou <12020042@qq.com>
Date: Tue, 23 Sep 2025 10:27:20 +0800
Subject: [PATCH] .
---
ruoyi-mqtt/pom.xml | 17 +-
.../main/java/com/ruoyi/mqtt/MqttUtil.java | 190 +++----------
.../com/ruoyi/mqtt/config/MqttConfig.java | 256 +++++++++---------
.../com/ruoyi/mqtt/config/MqttProperties.java | 223 +++++----------
...ot.autoconfigure.AutoConfiguration.imports | 1 -
.../com/ruoyi/cron/event/CronTaskEvent.java | 16 ++
.../com/ruoyi/cron/runner/CronRunner.java | 2 +-
7 files changed, 245 insertions(+), 460 deletions(-)
diff --git a/ruoyi-mqtt/pom.xml b/ruoyi-mqtt/pom.xml
index b2fd400..8def7bd 100644
--- a/ruoyi-mqtt/pom.xml
+++ b/ruoyi-mqtt/pom.xml
@@ -17,9 +17,8 @@
- org.eclipse.paho
- org.eclipse.paho.client.mqttv3
- 1.2.5
+ org.springframework.integration
+ spring-integration-mqtt
org.projectlombok
@@ -27,15 +26,9 @@
org.springframework.boot
- spring-boot-starter-json
-
-
- cn.hutool
- hutool-core
-
-
- cn.hutool
- hutool-cache
+ spring-boot-starter
+
+
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java
index 7449d4a..4c8f84c 100644
--- a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java
@@ -1,178 +1,70 @@
package com.ruoyi.mqtt;
-
import com.ruoyi.mqtt.config.MqttConfig;
-import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
-import java.nio.charset.StandardCharsets;
+import java.nio.charset.Charset;
-/**
- * MQTT工具类,提供便捷的MQTT消息发送和客户端获取功能。
- *
- * 该工具类封装了{@link MqttFactory}的常用操作,简化了MQTT消息的发送流程。
- * 支持发送字节数组和字符串类型的消息,并可指定服务质量(QoS)和保留标志。
- *
- */
public class MqttUtil {
-
- private MqttUtil() {
-
+ private MqttUtil(){
}
- /**
- * 获取MQTT工厂实例
- *
- * @return MQTT工厂实例
- */
- public static MqttFactory getFactory() {
- return MqttConfig.getMqttFactory();
+ public static MqttClient getClient(){
+ return MqttConfig.getClient();
}
- /**
- * 获取默认配置的MQTT操作项
- *
- * @return MQTT操作项
- */
- public static MqttItem getItem() {
- return getFactory().get();
- }
- /**
- * 根据配置名称获取MQTT操作项
- *
- * @param configName 配置名称
- * @return MQTT操作项
- */
- public static MqttItem getItem(String configName) {
- return getFactory().get(configName);
- }
- /**
- * 获取默认配置的MQTT客户端
- *
- * @return MQTT异步客户端
- */
- public static IMqttAsyncClient getClient() {
- return getItem().getClient();
- }
/**
- * 根据配置名称获取MQTT客户端
- *
- * @param configName 配置名称
- * @return MQTT异步客户端
+ * 发送消息
+ * @param topic - 主题
+ * @param message - 消息
+ * @throws MqttException
+ * @throws MqttPersistenceException
*/
- public static IMqttAsyncClient getClient(String configName) {
- return getItem(configName).getClient();
+ public static void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException{
+ if(MqttConfig.getClient()==null || !MqttConfig.getClient().isConnected()) {
+ throw new RuntimeException("mqtt未连接");
+ }
+ MqttConfig.getClient().publish(topic,message);
}
-
-
-
/**
- * 发送MQTT消息
- *
- * @param configName 配置项名称
- * @param sendName 配置的发送项名称
- * @param payload 消息负载(字节数组)
- * @param params 配置的发送项参数,替换发送项中的{0},{1}...
- * @throws MqttException MQTT异常
+ * 发送消息
+ * @param topic - 主题
+ * @param payload - 负载
+ * @param qos - qos
+ * @param retained - 保留
+ * @throws MqttException
+ * @throws MqttPersistenceException
*/
- public static IMqttDeliveryToken send(String configName, String sendName, byte[] payload, String... params)throws MqttException {
- return getFactory().send(configName, sendName, payload, params);
+ public static void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException{
+ MqttMessage message = new MqttMessage(payload);
+ message.setQos(qos);
+ message.setRetained(retained);
+ publish(topic,message);
}
- /**
- * 发送MQTT消息
- *
- * @param configName 配置项名称
- * @param sendName 配置的发送项名称
- * @param payload 消息负载(字符串)
- * @param params 配置的发送项参数,替换发送项中的{0},{1}...
- * @throws MqttException MQTT异常
- */
- public static IMqttDeliveryToken send(String configName, String sendName, String payload, String... params)throws MqttException {
- return getFactory().send(configName, sendName, payload.getBytes(StandardCharsets.UTF_8), params);
- }
- /**
- * 发送MQTT消息(使用默认配置)
- *
- * @param sendName 配置的发送项名称
- * @param payload 消息负载(字节数组)
- * @param params 配置的发送项参数,替换发送项中的{0},{1}...
- * @throws MqttException MQTT异常
- */
- public static IMqttDeliveryToken send(String sendName, byte[] payload, String... params)throws MqttException {
- return getFactory().send(sendName, payload, params);
- }
-
- /**
- * 发送MQTT消息(使用默认配置)
- *
- * @param sendName 配置的发送项名称
- * @param payload 消息负载(字符串)
- * @param params 配置的发送项参数,替换发送项中的{0},{1}...
- * @throws MqttException MQTT异常
- */
- public static IMqttDeliveryToken send(String sendName, String payload, String... params)throws MqttException {
- return getFactory().send(sendName, payload.getBytes(StandardCharsets.UTF_8), params);
- }
-
- /**
- * 发送MQTT消息
- *
- * @param configName 配置项名称
- * @param topic 主题
- * @param payload 消息负载(字节数组)
- * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次
- * @param retained 保留消息
- * @throws MqttException MQTT异常
- */
- public static IMqttDeliveryToken send(String configName, String topic, byte[] payload, int qos, boolean retained) throws MqttException{
- return getFactory().send(configName, topic, payload, qos, retained);
- }
-
- /**
- * 发送MQTT消息
- *
- * @param configName 配置项名称
- * @param topic 主题
- * @param payload 消息负载(字符串)
- * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次
- * @param retained 保留消息
- * @throws MqttException MQTT异常
- */
- public static IMqttDeliveryToken send(String configName, String topic, String payload, int qos, boolean retained) throws MqttException{
- return getFactory().send(configName, topic, payload.getBytes(StandardCharsets.UTF_8), qos, retained);
- }
-
- /**
- * 发送MQTT消息(使用默认配置)
- *
- * @param topic 主题
- * @param payload 消息负载(字节数组)
- * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次
- * @param retained 保留消息
- * @throws MqttException MQTT异常
- */
- public static IMqttDeliveryToken send(String topic, byte[] payload, int qos, boolean retained) throws MqttException{
- return getFactory().send(topic, payload, qos, retained);
- }
/**
- * 发送MQTT消息(使用默认配置)
- *
- * @param topic 主题
- * @param payload 消息负载(字符串)
- * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次
- * @param retained 保留消息
- * @throws MqttException MQTT异常
+ * 发送消息
+ * @param topic - 主题
+ * @param payload - 负载
+ * @param qos - qos
+ * @param retained - 保留
+ * @throws MqttException
+ * @throws MqttPersistenceException
*/
- public static IMqttDeliveryToken send(String topic, String payload, int qos, boolean retained) throws MqttException{
- return getFactory().send(topic, payload.getBytes(StandardCharsets.UTF_8), qos, retained);
+ public static void publish(String topic, String payload, int qos, boolean retained) throws MqttException, MqttPersistenceException{
+ MqttMessage message = new MqttMessage(payload.getBytes(Charset.forName("UTF-8")));
+ message.setQos(qos);
+ message.setRetained(retained);
+ publish(topic,message);
}
}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java
index 4f2daf2..b8efdbb 100644
--- a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java
@@ -1,181 +1,167 @@
package com.ruoyi.mqtt.config;
-import com.ruoyi.mqtt.MqttEventHandler;
-import com.ruoyi.mqtt.MqttFactory;
-import com.ruoyi.mqtt.MqttItem;
-import com.ruoyi.mqtt.MqttUtil;
-import com.ruoyi.mqtt.event.MqttEvent;
-import com.ruoyi.mqtt.event.MqttSendEvent;
-import com.ruoyi.mqtt.event.MqttSendTopicEvent;
-import lombok.Getter;
+import com.ruoyi.mqtt.MqttConnectionEvent;
+import com.ruoyi.mqtt.MqttConnectionLostEvent;
+import com.ruoyi.mqtt.MqttMessageDeliveryEvent;
+import com.ruoyi.mqtt.MqttMessageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
-import org.springframework.core.Ordered;
-import org.springframework.core.PriorityOrdered;
+import org.springframework.integration.mqtt.support.MqttUtils;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-@Configuration
+@Configuration(proxyBeanMethods = false)
+@ConditionalOnProperty(prefix = MqttProperties.PREFIX, name = "enabled", havingValue = "true", matchIfMissing = false)
@EnableConfigurationProperties(MqttProperties.class)
@Slf4j
@RequiredArgsConstructor
-public class MqttConfig implements MqttFactory, MqttEventHandler, PriorityOrdered {
+public class MqttConfig {
private final MqttProperties properties;
- private final ApplicationContext act;
+ private final ApplicationEventPublisher publisher;
- @Getter
- private static MqttFactory mqttFactory;
+ private static MqttClient client;
- private ScheduledExecutorService executorService;
+ private static MqttCallback mqttCallback;
- private final Map clients = new ConcurrentHashMap<>();
+ private static MqttConnectOptions options;
- @PostConstruct
- public void init() throws Exception {
- if (!properties.getEnabled()) {
- log.info("mqtt模块未激活");
- return;
- }
- log.info("mqtt模块启动中...");
- try {
- executorService = act.getBeansOfType(ScheduledExecutorService.class).values().stream().findFirst().get();
- } catch (Exception e) {
- executorService = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
- }
- properties.getConfigs().forEach((a, b) -> {
- if (!b.getEnabled()) {
- return;
- }
- put(a, b, this);
- });
- mqttFactory = this;
- }
+ private static AtomicBoolean isReConnect = new AtomicBoolean(false);
- @Override
- public boolean next(MqttEvent event, MqttItem item) {
- log.debug("mqtt event: {} = {}", event.getConfigName(), event.getClass().getName());
- act.publishEvent(event);
- return true;
+ public static MqttClient getClient() {
+ return client;
}
- @PreDestroy
- public void destroy() {
- clients.forEach((a, b) -> {
- try {
- b.destroy();
- } catch (Exception e) {
- log.error("mqtt客户端关闭失败:" + a, e);
- }
- });
- try {
- executorService.shutdown();
- } catch (Exception e) {
- throw new RuntimeException(e);
+ @Bean
+ @ConditionalOnMissingBean(MqttClient.class)
+ public MqttClient mqttClient(MqttConnectOptions options, MqttCallback callback) throws Exception {
+ log.debug("mqtt:开始创建mqtt客户端");
+ client = new MqttClient(properties.getUrl(), properties.getClientId()+'_'+Long.toString(System.currentTimeMillis()-new Random().nextInt(),36), new MemoryPersistence());
+ try{
+ connect();
+ }catch (Exception ex) {
+ log.error("mqtt:连接异常:"+properties.getUrl(), ex);
}
+ return client;
}
- public MqttItem get(String configName) {
- return clients.get(configName);
- }
-
+ @Bean
+ public MqttCallback mqttCallback() {
+ mqttCallback = new MqttCallback() {
- public void put(String configName, MqttProperties.Config config, MqttEventHandler... handlers) {
- if (clients.containsKey(configName)) {
- throw new RuntimeException("配置项已经存在");
- }
- MqttItem mqttItem = new MqttItem(configName, config, executorService);
- if (handlers != null && handlers.length > 0) {
- Collections.addAll(mqttItem.getMessageHandlers(), handlers);
- }
- clients.put(configName, mqttItem);
- log.debug("mqtt配置项添加:" + configName);
- }
+ @Override
+ public void connectionLost(Throwable throwable) {
+ log.warn("mqtt:连接断开", throwable);
+ publisher.publishEvent(new MqttConnectionLostEvent(throwable));
+ }
- public boolean contains(String configName) {
- return clients.containsKey(configName);
- }
+ @Override
+ public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+ log.trace("mqtt:接收到主题为{}的消息", s);
+ publisher.publishEvent(new MqttMessageEvent(s, mqttMessage));
+ }
- public void remove(String configName) {
- if (contains(configName)) {
- get(configName).destroy();
- clients.remove(configName);
- }
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ log.trace("mqtt:确认发送到主题为{}的消息", iMqttDeliveryToken.getTopics());
+ publisher.publishEvent(new MqttMessageDeliveryEvent(iMqttDeliveryToken));
+ }
+ };
+ return mqttCallback;
}
-
- public IMqttDeliveryToken send(String configName, String topic, byte[] payload, int qos, boolean retained) throws MqttException {
- MqttItem item = get(configName);
- if (item == null) {
- throw new RuntimeException("mqtt客户端未找到:" + configName);
- }
- log.debug("mqtt发送成功:configName={},topic={}", configName, topic);
- return item.getClient().publish(topic, payload, qos, retained);
+ @Bean
+ public MqttConnectOptions mqttConnectOptions() {
+ // 连接设置
+ options = new MqttConnectOptions();
+ // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
+ // 设置为true表示每次连接服务器都是以新的身份
+ options.setCleanSession(properties.getCleanSession());
+ // 设置连接用户名
+ options.setUserName(properties.getUsername());
+ // 设置连接密码
+ options.setPassword(properties.getPassword().toCharArray());
+ // 设置超时时间,单位为秒
+ options.setConnectionTimeout(properties.getConnectionTimeout());
+ // 设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
+ options.setKeepAliveInterval(properties.getKeepAliveInterval());
+ // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
+ options.setWill(properties.getWillTopic(), properties.getWillMessage().getBytes(), properties.getWillQos(), false);
+ // 设置重连
+ options.setAutomaticReconnect(false);
+
+ return options;
}
- public IMqttDeliveryToken send(String configName, String sendName, byte[] payload, String... params) throws MqttException {
+ public void connect() throws Exception {
- MqttItem item = get(configName);
- if (item == null) {
- throw new RuntimeException("mqtt客户端未找到:" + configName);
- }
+ log.debug("mqtt:url={},clientId={}", client.getServerURI(), client.getClientId());
+ // 设置回调
+ client.setCallback(mqttCallback);
- MqttProperties.Topic topic = item.getConfig().getSends().get(sendName);
- if (topic == null) {
- throw new RuntimeException("mqtt配置的主题未找到:" + configName + " = " + sendName);
- }
- String topicTempalte = topic.getTopic();
- if (params != null) {
- for (int i = 0; i < params.length; i++) {
- topicTempalte = topicTempalte.replace("{" + i + "}", params[i]);
+ client.connect(MqttUtils.cloneConnectOptions(options));
+ if (properties.getTopic() != null && properties.getTopic().length > 0) {
+ if (properties.getTopic().length != properties.getQos().length) {
+ throw new Exception("mqtt:订阅的主题和qos不一致");
}
+ client.subscribe(properties.getTopic(), properties.getQos());
+ log.debug("mqtt:订阅了主题={}", Arrays.toString(properties.getTopic()));
}
-
- log.debug("mqtt发送成功:configName={},sendName={},topic={}", configName, sendName, topicTempalte);
- return item.getClient().publish(topicTempalte, payload, topic.getQos(), topic.getRetained());
+ log.debug("mqtt:创建客户端成功");
+ publisher.publishEvent(new MqttConnectionEvent());
}
+ @Scheduled(fixedDelay = 5000)
+ public void reConnect() {
- @EventListener
- public void listener(MqttSendEvent event) throws MqttException {
- send(event.getConfigName(), event.getSendName(), event.getPayload(), event.getParams());
- }
-
- @EventListener
- public void listener(MqttSendTopicEvent event) throws MqttException {
- send(event.getConfigName(), event.getTopic(), event.getPayload(), event.getQos(), event.isRetained());
+ if (!client.isConnected()) {
+ try {
+ client.disconnect();
+ } catch (Exception e) {
+ }
+/* try {
+ client.close();
+ } catch (Exception e) {
+ }*/
+ try {
+ connect();
+ } catch (Exception ex) {
+ log.error("mqtt:连接异常:"+properties.getUrl(), ex);
+ }
+ }
}
-// @Scheduled(cron = "*/5 * * * * ?")
-// public void test() {
-// String s = Long.toString(System.currentTimeMillis(), 36);
-// try {
-//// act.publishEvent(new MqttSendEvent("test",s.getBytes(StandardCharsets.UTF_8),s));
-// MqttUtil.send("test", s, new String[]{s});
-// log.info("test success:{}", s);
-// } catch (Exception e) {
-// log.info("test error:" + s, e);
-// }
-// }
-
- @Override
- public int getOrder() {
- return Ordered.HIGHEST_PRECEDENCE; // 最高优先级
+ /**
+ * 断开连接
+ */
+ @PreDestroy
+ public void disConnect() {
+ try {
+ if (client != null) {
+ client.disconnect();
+ }
+ } catch (Exception e) {
+ }
}
}
diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java
index 5e8494d..75ade6a 100644
--- a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java
+++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java
@@ -1,194 +1,93 @@
package com.ruoyi.mqtt.config;
-import com.ruoyi.mqtt.MqttVersion;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
-import java.util.*;
+import java.util.UUID;
-/**
- * MQTT配置属性类
- *
- * 该类定义了MQTT客户端的配置属性,包括是否启用、默认配置、以及各个MQTT客户端的详细配置。
- *
- */
@Data
-@ConfigurationProperties(value = MqttProperties.PREFIX, ignoreInvalidFields = true, ignoreUnknownFields = true)
+@ConfigurationProperties(value = MqttProperties.PREFIX,ignoreInvalidFields = true,ignoreUnknownFields = true)
public class MqttProperties {
- public final static String DEFAULT = "default";
-
/**
* 配置前缀
*/
public static final String PREFIX = "spring.mqtt";
/**
- * 是否启用MQTT功能
- * 默认值: false
+ * 是否启用,
+ * 默认: false
*/
private Boolean enabled = false;
/**
- * 默认配置名称
- * 默认值: "default"
+ * 客户端编号(唯一)
+ * 默认: UUID
+ */
+ private String clientId = UUID.randomUUID().toString();
+
+ /**
+ * 服务器url
+ * 默认: tcp://127.0.0.1:1883
+ */
+ private String url="tcp://127.0.0.1:1883";
+
+ /**
+ * 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
+ * 设置为true表示每次连接服务器都是以新的身份
+ * 默认: true
+ */
+ private Boolean cleanSession = true;
+
+ /**
+ * 用户名
+ * 默认: admin
+ */
+ private String username = "admin";
+
+ /**
+ * 默认: 123456
+ */
+ private String password = "123456";
+
+ /**
+ * 连接超时,单位秒
+ * 默认: 100
+ */
+ private Integer connectionTimeout = 100;
+
+ /**
+ * 心跳时间 单位为秒,表示服务器每隔60秒的时间向客户端发送心跳判断客户端是否在线
+ * 默认: 60
+ */
+ private Integer keepAliveInterval = 60;
+
+ /**
+ * 遗嘱主题
+ * 默认: will/topic
*/
- private String defaultConfig = DEFAULT;
+ private String willTopic = "will/topic";
/**
- * MQTT客户端配置映射,key为配置名称,value为具体的配置项
+ * 遗嘱消息
+ * 默认: offline
*/
- private Map configs = new HashMap<>();
+ private String willMessage = "offline";
+ /**
+ * 遗嘱Qos
+ * 默认: 0
+ */
+ private Integer willQos = 0;
/**
- * MQTT主题配置类
- *
- * 该类定义了MQTT主题的相关配置,包括主题名称、服务质量(QoS)和是否保留消息。
- *
+ * 订阅的主题数组
*/
- @Data
- public static class Topic {
-
- /**
- * 订阅主题可以使用 单级通配符 "+" 和多级通配符 "#"
- * 发送主题可以使用如:{0},{1},{2},发送时使用params数组替换
- */
- private String topic;
-
- /**
- * 服务质量(QoS)
- * 默认值: 0
- */
- private Integer qos = 0;
-
- /**
- * 是否保留消息,订阅主题时无需配置
- * 默认值: false
- */
- private Boolean retained = false;
- }
+ private String[] topic;
/**
- * MQTT客户端配置类
- *
- * 该类定义了单个MQTT客户端的详细配置,包括连接信息、认证信息、心跳设置等。
- *
+ * 订阅的主题数组对应的qos
*/
- @Data
- public static class Config {
-
- /**
- * 是否启用该MQTT客户端配置
- * 默认值: true
- */
- private Boolean enabled = true;
-
- /**
- * 客户端编号(唯一)
- * 默认值: 基于系统时间生成的36进制字符串
- */
- private String clientId = Long.toString(System.currentTimeMillis(), 36);
-
- /**
- * 服务器URL
- * 默认值: tcp://127.0.0.1:1883
- */
- private String url = "tcp://127.0.0.1:1883";
-
- /**
- * 服务器集群URL列表(可选)
- */
- private List urls = new ArrayList<>();
-
- /**
- * 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
- * 设置为true表示每次连接服务器都是以新的身份
- * 默认值: true
- */
- private Boolean cleanSession = true;
-
- /**
- * 用户名
- * 默认值: admin
- */
- private String username = "admin";
-
- /**
- * 密码
- * 默认值: 123456
- */
- private String password = "123456";
-
- /**
- * 连接超时时间,单位秒
- * 默认值: 5
- */
- private Integer connectionTimeout = 5;
-
- /**
- * 心跳时间 单位为秒,表示服务器每隔60秒的时间向客户端发送心跳判断客户端是否在线
- * 默认值: 60
- */
- private Integer keepAliveInterval = 60;
-
- /**
- * 遗嘱主题
- * 默认值: will/topic
- */
- private String willTopic = "will/topic";
-
- /**
- * 遗嘱消息
- * 默认值: offline
- */
- private String willMessage = "offline";
-
- /**
- * 遗嘱消息的服务质量(QoS)
- * 默认值: 0
- */
- private Integer willQos = 0;
-
- /**
- * 订阅主题列表
- */
- private List subscribes = new ArrayList<>();
-
- /**
- * 是否自动重连
- * 默认值: true (注释掉的配置)
- */
-// private Boolean automaticReconnect = true;
-
-// private Integer maxReconnectDelay = Integer.MAX_VALUE;
-
- /**
- * 自定义WebSocket头部信息
- */
- private Properties customWebSocketHeaders = new Properties();
-
- /**
- * SSL属性配置
- */
- private Properties sslProperties = new Properties();
-
- /**
- * 发送主题映射,key为发送名称,value为具体的主题配置
- */
- private Map sends = new HashMap<>();
-
- /**
- * MQTT协议版本
- * 默认值: MQTT_VERSION_DEFAULT
- */
- private MqttVersion mqttVersion = MqttVersion.MQTT_VERSION_DEFAULT;
-
- /**
- * 最大未确认消息数量
- * 默认值: 10
- */
- private Integer maxInflight = 10;
- }
+ private int[] qos;
}
diff --git a/ruoyi-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index f3a8b93..551c9aa 100644
--- a/ruoyi-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/ruoyi-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -1,2 +1 @@
com.ruoyi.mqtt.config.MqttConfig
-com.ruoyi.mqtt.rmi.config.MqttRmiConfig
diff --git a/ruoyi-system-cron/src/main/java/com/ruoyi/cron/event/CronTaskEvent.java b/ruoyi-system-cron/src/main/java/com/ruoyi/cron/event/CronTaskEvent.java
index dc4ae42..bb88f62 100644
--- a/ruoyi-system-cron/src/main/java/com/ruoyi/cron/event/CronTaskEvent.java
+++ b/ruoyi-system-cron/src/main/java/com/ruoyi/cron/event/CronTaskEvent.java
@@ -1,8 +1,12 @@
package com.ruoyi.cron.event;
+import cn.hutool.core.collection.ListUtil;
+import com.ruoyi.common.utils.spring.SpringUtils;
+import com.ruoyi.cron.vo.CronTaskVo;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
+import java.lang.reflect.Method;
import java.util.List;
/**
@@ -26,4 +30,16 @@ public class CronTaskEvent extends ApplicationEvent {
this.paramELs = paramELs;
this.cronId = cronId;
}
+
+
+
+ public static CronTaskEvent of(Method method, String... paramEls) {
+ String taskId = CronTaskVo.id(method);
+ return new CronTaskEvent(null,taskId,ListUtil.of(paramEls));
+ }
+
+ public static void publishEvent(Method method,String... paramEls) {
+ SpringUtils.publishEvent(of(method, paramEls));
+ }
+
}
diff --git a/ruoyi-system-cron/src/main/java/com/ruoyi/cron/runner/CronRunner.java b/ruoyi-system-cron/src/main/java/com/ruoyi/cron/runner/CronRunner.java
index 6bc47b9..865756b 100644
--- a/ruoyi-system-cron/src/main/java/com/ruoyi/cron/runner/CronRunner.java
+++ b/ruoyi-system-cron/src/main/java/com/ruoyi/cron/runner/CronRunner.java
@@ -170,7 +170,7 @@ public class CronRunner implements ApplicationRunner {
// 执行成功后,继续执行子任务
- if (ObjUtil.isNotNull(event.getCronId() != null)) {
+ if (ObjUtil.isNotNull(event.getCronId())) {
List list = MongoUtil.find(CronTask.class, MongoUtil.conditions().put("pid", event.getCronId()).put("enabled", true));
if (CollUtil.isNotEmpty(list)) {
list.forEach(cronTask -> {