diff --git a/ruoyi-mqtt/pom.xml b/ruoyi-mqtt/pom.xml index b2fd400..8def7bd 100644 --- a/ruoyi-mqtt/pom.xml +++ b/ruoyi-mqtt/pom.xml @@ -17,9 +17,8 @@ - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.2.5 + org.springframework.integration + spring-integration-mqtt org.projectlombok @@ -27,15 +26,9 @@ org.springframework.boot - spring-boot-starter-json - - - cn.hutool - hutool-core - - - cn.hutool - hutool-cache + spring-boot-starter + + diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java index 7449d4a..4c8f84c 100644 --- a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java +++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/MqttUtil.java @@ -1,178 +1,70 @@ package com.ruoyi.mqtt; - import com.ruoyi.mqtt.config.MqttConfig; -import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttPersistenceException; -import java.nio.charset.StandardCharsets; +import java.nio.charset.Charset; -/** - * MQTT工具类,提供便捷的MQTT消息发送和客户端获取功能。 - *

- * 该工具类封装了{@link MqttFactory}的常用操作,简化了MQTT消息的发送流程。 - * 支持发送字节数组和字符串类型的消息,并可指定服务质量(QoS)和保留标志。 - *

- */ public class MqttUtil { - - private MqttUtil() { - + private MqttUtil(){ } - /** - * 获取MQTT工厂实例 - * - * @return MQTT工厂实例 - */ - public static MqttFactory getFactory() { - return MqttConfig.getMqttFactory(); + public static MqttClient getClient(){ + return MqttConfig.getClient(); } - /** - * 获取默认配置的MQTT操作项 - * - * @return MQTT操作项 - */ - public static MqttItem getItem() { - return getFactory().get(); - } - /** - * 根据配置名称获取MQTT操作项 - * - * @param configName 配置名称 - * @return MQTT操作项 - */ - public static MqttItem getItem(String configName) { - return getFactory().get(configName); - } - /** - * 获取默认配置的MQTT客户端 - * - * @return MQTT异步客户端 - */ - public static IMqttAsyncClient getClient() { - return getItem().getClient(); - } /** - * 根据配置名称获取MQTT客户端 - * - * @param configName 配置名称 - * @return MQTT异步客户端 + * 发送消息 + * @param topic - 主题 + * @param message - 消息 + * @throws MqttException + * @throws MqttPersistenceException */ - public static IMqttAsyncClient getClient(String configName) { - return getItem(configName).getClient(); + public static void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException{ + if(MqttConfig.getClient()==null || !MqttConfig.getClient().isConnected()) { + throw new RuntimeException("mqtt未连接"); + } + MqttConfig.getClient().publish(topic,message); } - - - /** - * 发送MQTT消息 - * - * @param configName 配置项名称 - * @param sendName 配置的发送项名称 - * @param payload 消息负载(字节数组) - * @param params 配置的发送项参数,替换发送项中的{0},{1}... - * @throws MqttException MQTT异常 + * 发送消息 + * @param topic - 主题 + * @param payload - 负载 + * @param qos - qos + * @param retained - 保留 + * @throws MqttException + * @throws MqttPersistenceException */ - public static IMqttDeliveryToken send(String configName, String sendName, byte[] payload, String... params)throws MqttException { - return getFactory().send(configName, sendName, payload, params); + public static void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException{ + MqttMessage message = new MqttMessage(payload); + message.setQos(qos); + message.setRetained(retained); + publish(topic,message); } - /** - * 发送MQTT消息 - * - * @param configName 配置项名称 - * @param sendName 配置的发送项名称 - * @param payload 消息负载(字符串) - * @param params 配置的发送项参数,替换发送项中的{0},{1}... - * @throws MqttException MQTT异常 - */ - public static IMqttDeliveryToken send(String configName, String sendName, String payload, String... params)throws MqttException { - return getFactory().send(configName, sendName, payload.getBytes(StandardCharsets.UTF_8), params); - } - /** - * 发送MQTT消息(使用默认配置) - * - * @param sendName 配置的发送项名称 - * @param payload 消息负载(字节数组) - * @param params 配置的发送项参数,替换发送项中的{0},{1}... - * @throws MqttException MQTT异常 - */ - public static IMqttDeliveryToken send(String sendName, byte[] payload, String... params)throws MqttException { - return getFactory().send(sendName, payload, params); - } - - /** - * 发送MQTT消息(使用默认配置) - * - * @param sendName 配置的发送项名称 - * @param payload 消息负载(字符串) - * @param params 配置的发送项参数,替换发送项中的{0},{1}... - * @throws MqttException MQTT异常 - */ - public static IMqttDeliveryToken send(String sendName, String payload, String... params)throws MqttException { - return getFactory().send(sendName, payload.getBytes(StandardCharsets.UTF_8), params); - } - - /** - * 发送MQTT消息 - * - * @param configName 配置项名称 - * @param topic 主题 - * @param payload 消息负载(字节数组) - * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次 - * @param retained 保留消息 - * @throws MqttException MQTT异常 - */ - public static IMqttDeliveryToken send(String configName, String topic, byte[] payload, int qos, boolean retained) throws MqttException{ - return getFactory().send(configName, topic, payload, qos, retained); - } - - /** - * 发送MQTT消息 - * - * @param configName 配置项名称 - * @param topic 主题 - * @param payload 消息负载(字符串) - * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次 - * @param retained 保留消息 - * @throws MqttException MQTT异常 - */ - public static IMqttDeliveryToken send(String configName, String topic, String payload, int qos, boolean retained) throws MqttException{ - return getFactory().send(configName, topic, payload.getBytes(StandardCharsets.UTF_8), qos, retained); - } - - /** - * 发送MQTT消息(使用默认配置) - * - * @param topic 主题 - * @param payload 消息负载(字节数组) - * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次 - * @param retained 保留消息 - * @throws MqttException MQTT异常 - */ - public static IMqttDeliveryToken send(String topic, byte[] payload, int qos, boolean retained) throws MqttException{ - return getFactory().send(topic, payload, qos, retained); - } /** - * 发送MQTT消息(使用默认配置) - * - * @param topic 主题 - * @param payload 消息负载(字符串) - * @param qos 服务质量等级 QoS 0:最多一次 QoS 1:至少一次 QoS 2:恰好一次 - * @param retained 保留消息 - * @throws MqttException MQTT异常 + * 发送消息 + * @param topic - 主题 + * @param payload - 负载 + * @param qos - qos + * @param retained - 保留 + * @throws MqttException + * @throws MqttPersistenceException */ - public static IMqttDeliveryToken send(String topic, String payload, int qos, boolean retained) throws MqttException{ - return getFactory().send(topic, payload.getBytes(StandardCharsets.UTF_8), qos, retained); + public static void publish(String topic, String payload, int qos, boolean retained) throws MqttException, MqttPersistenceException{ + MqttMessage message = new MqttMessage(payload.getBytes(Charset.forName("UTF-8"))); + message.setQos(qos); + message.setRetained(retained); + publish(topic,message); } } diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java index 4f2daf2..b8efdbb 100644 --- a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java +++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttConfig.java @@ -1,181 +1,167 @@ package com.ruoyi.mqtt.config; -import com.ruoyi.mqtt.MqttEventHandler; -import com.ruoyi.mqtt.MqttFactory; -import com.ruoyi.mqtt.MqttItem; -import com.ruoyi.mqtt.MqttUtil; -import com.ruoyi.mqtt.event.MqttEvent; -import com.ruoyi.mqtt.event.MqttSendEvent; -import com.ruoyi.mqtt.event.MqttSendTopicEvent; -import lombok.Getter; +import com.ruoyi.mqtt.MqttConnectionEvent; +import com.ruoyi.mqtt.MqttConnectionLostEvent; +import com.ruoyi.mqtt.MqttMessageDeliveryEvent; +import com.ruoyi.mqtt.MqttMessageEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; -import org.springframework.core.Ordered; -import org.springframework.core.PriorityOrdered; +import org.springframework.integration.mqtt.support.MqttUtils; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -@Configuration +@Configuration(proxyBeanMethods = false) +@ConditionalOnProperty(prefix = MqttProperties.PREFIX, name = "enabled", havingValue = "true", matchIfMissing = false) @EnableConfigurationProperties(MqttProperties.class) @Slf4j @RequiredArgsConstructor -public class MqttConfig implements MqttFactory, MqttEventHandler, PriorityOrdered { +public class MqttConfig { private final MqttProperties properties; - private final ApplicationContext act; + private final ApplicationEventPublisher publisher; - @Getter - private static MqttFactory mqttFactory; + private static MqttClient client; - private ScheduledExecutorService executorService; + private static MqttCallback mqttCallback; - private final Map clients = new ConcurrentHashMap<>(); + private static MqttConnectOptions options; - @PostConstruct - public void init() throws Exception { - if (!properties.getEnabled()) { - log.info("mqtt模块未激活"); - return; - } - log.info("mqtt模块启动中..."); - try { - executorService = act.getBeansOfType(ScheduledExecutorService.class).values().stream().findFirst().get(); - } catch (Exception e) { - executorService = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors()); - } - properties.getConfigs().forEach((a, b) -> { - if (!b.getEnabled()) { - return; - } - put(a, b, this); - }); - mqttFactory = this; - } + private static AtomicBoolean isReConnect = new AtomicBoolean(false); - @Override - public boolean next(MqttEvent event, MqttItem item) { - log.debug("mqtt event: {} = {}", event.getConfigName(), event.getClass().getName()); - act.publishEvent(event); - return true; + public static MqttClient getClient() { + return client; } - @PreDestroy - public void destroy() { - clients.forEach((a, b) -> { - try { - b.destroy(); - } catch (Exception e) { - log.error("mqtt客户端关闭失败:" + a, e); - } - }); - try { - executorService.shutdown(); - } catch (Exception e) { - throw new RuntimeException(e); + @Bean + @ConditionalOnMissingBean(MqttClient.class) + public MqttClient mqttClient(MqttConnectOptions options, MqttCallback callback) throws Exception { + log.debug("mqtt:开始创建mqtt客户端"); + client = new MqttClient(properties.getUrl(), properties.getClientId()+'_'+Long.toString(System.currentTimeMillis()-new Random().nextInt(),36), new MemoryPersistence()); + try{ + connect(); + }catch (Exception ex) { + log.error("mqtt:连接异常:"+properties.getUrl(), ex); } + return client; } - public MqttItem get(String configName) { - return clients.get(configName); - } - + @Bean + public MqttCallback mqttCallback() { + mqttCallback = new MqttCallback() { - public void put(String configName, MqttProperties.Config config, MqttEventHandler... handlers) { - if (clients.containsKey(configName)) { - throw new RuntimeException("配置项已经存在"); - } - MqttItem mqttItem = new MqttItem(configName, config, executorService); - if (handlers != null && handlers.length > 0) { - Collections.addAll(mqttItem.getMessageHandlers(), handlers); - } - clients.put(configName, mqttItem); - log.debug("mqtt配置项添加:" + configName); - } + @Override + public void connectionLost(Throwable throwable) { + log.warn("mqtt:连接断开", throwable); + publisher.publishEvent(new MqttConnectionLostEvent(throwable)); + } - public boolean contains(String configName) { - return clients.containsKey(configName); - } + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + log.trace("mqtt:接收到主题为{}的消息", s); + publisher.publishEvent(new MqttMessageEvent(s, mqttMessage)); + } - public void remove(String configName) { - if (contains(configName)) { - get(configName).destroy(); - clients.remove(configName); - } + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.trace("mqtt:确认发送到主题为{}的消息", iMqttDeliveryToken.getTopics()); + publisher.publishEvent(new MqttMessageDeliveryEvent(iMqttDeliveryToken)); + } + }; + return mqttCallback; } - - public IMqttDeliveryToken send(String configName, String topic, byte[] payload, int qos, boolean retained) throws MqttException { - MqttItem item = get(configName); - if (item == null) { - throw new RuntimeException("mqtt客户端未找到:" + configName); - } - log.debug("mqtt发送成功:configName={},topic={}", configName, topic); - return item.getClient().publish(topic, payload, qos, retained); + @Bean + public MqttConnectOptions mqttConnectOptions() { + // 连接设置 + options = new MqttConnectOptions(); + // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 + // 设置为true表示每次连接服务器都是以新的身份 + options.setCleanSession(properties.getCleanSession()); + // 设置连接用户名 + options.setUserName(properties.getUsername()); + // 设置连接密码 + options.setPassword(properties.getPassword().toCharArray()); + // 设置超时时间,单位为秒 + options.setConnectionTimeout(properties.getConnectionTimeout()); + // 设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 + options.setKeepAliveInterval(properties.getKeepAliveInterval()); + // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 + options.setWill(properties.getWillTopic(), properties.getWillMessage().getBytes(), properties.getWillQos(), false); + // 设置重连 + options.setAutomaticReconnect(false); + + return options; } - public IMqttDeliveryToken send(String configName, String sendName, byte[] payload, String... params) throws MqttException { + public void connect() throws Exception { - MqttItem item = get(configName); - if (item == null) { - throw new RuntimeException("mqtt客户端未找到:" + configName); - } + log.debug("mqtt:url={},clientId={}", client.getServerURI(), client.getClientId()); + // 设置回调 + client.setCallback(mqttCallback); - MqttProperties.Topic topic = item.getConfig().getSends().get(sendName); - if (topic == null) { - throw new RuntimeException("mqtt配置的主题未找到:" + configName + " = " + sendName); - } - String topicTempalte = topic.getTopic(); - if (params != null) { - for (int i = 0; i < params.length; i++) { - topicTempalte = topicTempalte.replace("{" + i + "}", params[i]); + client.connect(MqttUtils.cloneConnectOptions(options)); + if (properties.getTopic() != null && properties.getTopic().length > 0) { + if (properties.getTopic().length != properties.getQos().length) { + throw new Exception("mqtt:订阅的主题和qos不一致"); } + client.subscribe(properties.getTopic(), properties.getQos()); + log.debug("mqtt:订阅了主题={}", Arrays.toString(properties.getTopic())); } - - log.debug("mqtt发送成功:configName={},sendName={},topic={}", configName, sendName, topicTempalte); - return item.getClient().publish(topicTempalte, payload, topic.getQos(), topic.getRetained()); + log.debug("mqtt:创建客户端成功"); + publisher.publishEvent(new MqttConnectionEvent()); } + @Scheduled(fixedDelay = 5000) + public void reConnect() { - @EventListener - public void listener(MqttSendEvent event) throws MqttException { - send(event.getConfigName(), event.getSendName(), event.getPayload(), event.getParams()); - } - - @EventListener - public void listener(MqttSendTopicEvent event) throws MqttException { - send(event.getConfigName(), event.getTopic(), event.getPayload(), event.getQos(), event.isRetained()); + if (!client.isConnected()) { + try { + client.disconnect(); + } catch (Exception e) { + } +/* try { + client.close(); + } catch (Exception e) { + }*/ + try { + connect(); + } catch (Exception ex) { + log.error("mqtt:连接异常:"+properties.getUrl(), ex); + } + } } -// @Scheduled(cron = "*/5 * * * * ?") -// public void test() { -// String s = Long.toString(System.currentTimeMillis(), 36); -// try { -//// act.publishEvent(new MqttSendEvent("test",s.getBytes(StandardCharsets.UTF_8),s)); -// MqttUtil.send("test", s, new String[]{s}); -// log.info("test success:{}", s); -// } catch (Exception e) { -// log.info("test error:" + s, e); -// } -// } - - @Override - public int getOrder() { - return Ordered.HIGHEST_PRECEDENCE; // 最高优先级 + /** + * 断开连接 + */ + @PreDestroy + public void disConnect() { + try { + if (client != null) { + client.disconnect(); + } + } catch (Exception e) { + } } } diff --git a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java index 5e8494d..75ade6a 100644 --- a/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java +++ b/ruoyi-mqtt/src/main/java/com/ruoyi/mqtt/config/MqttProperties.java @@ -1,194 +1,93 @@ package com.ruoyi.mqtt.config; -import com.ruoyi.mqtt.MqttVersion; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; -import java.util.*; +import java.util.UUID; -/** - * MQTT配置属性类 - *

- * 该类定义了MQTT客户端的配置属性,包括是否启用、默认配置、以及各个MQTT客户端的详细配置。 - *

- */ @Data -@ConfigurationProperties(value = MqttProperties.PREFIX, ignoreInvalidFields = true, ignoreUnknownFields = true) +@ConfigurationProperties(value = MqttProperties.PREFIX,ignoreInvalidFields = true,ignoreUnknownFields = true) public class MqttProperties { - public final static String DEFAULT = "default"; - /** * 配置前缀 */ public static final String PREFIX = "spring.mqtt"; /** - * 是否启用MQTT功能 - * 默认值: false + * 是否启用, + * 默认: false */ private Boolean enabled = false; /** - * 默认配置名称 - * 默认值: "default" + * 客户端编号(唯一) + * 默认: UUID + */ + private String clientId = UUID.randomUUID().toString(); + + /** + * 服务器url + * 默认: tcp://127.0.0.1:1883 + */ + private String url="tcp://127.0.0.1:1883"; + + /** + * 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 + * 设置为true表示每次连接服务器都是以新的身份 + * 默认: true + */ + private Boolean cleanSession = true; + + /** + * 用户名 + * 默认: admin + */ + private String username = "admin"; + + /** + * 默认: 123456 + */ + private String password = "123456"; + + /** + * 连接超时,单位秒 + * 默认: 100 + */ + private Integer connectionTimeout = 100; + + /** + * 心跳时间 单位为秒,表示服务器每隔60秒的时间向客户端发送心跳判断客户端是否在线 + * 默认: 60 + */ + private Integer keepAliveInterval = 60; + + /** + * 遗嘱主题 + * 默认: will/topic */ - private String defaultConfig = DEFAULT; + private String willTopic = "will/topic"; /** - * MQTT客户端配置映射,key为配置名称,value为具体的配置项 + * 遗嘱消息 + * 默认: offline */ - private Map configs = new HashMap<>(); + private String willMessage = "offline"; + /** + * 遗嘱Qos + * 默认: 0 + */ + private Integer willQos = 0; /** - * MQTT主题配置类 - *

- * 该类定义了MQTT主题的相关配置,包括主题名称、服务质量(QoS)和是否保留消息。 - *

+ * 订阅的主题数组 */ - @Data - public static class Topic { - - /** - * 订阅主题可以使用 单级通配符 "+" 和多级通配符 "#" - * 发送主题可以使用如:{0},{1},{2},发送时使用params数组替换 - */ - private String topic; - - /** - * 服务质量(QoS) - * 默认值: 0 - */ - private Integer qos = 0; - - /** - * 是否保留消息,订阅主题时无需配置 - * 默认值: false - */ - private Boolean retained = false; - } + private String[] topic; /** - * MQTT客户端配置类 - *

- * 该类定义了单个MQTT客户端的详细配置,包括连接信息、认证信息、心跳设置等。 - *

+ * 订阅的主题数组对应的qos */ - @Data - public static class Config { - - /** - * 是否启用该MQTT客户端配置 - * 默认值: true - */ - private Boolean enabled = true; - - /** - * 客户端编号(唯一) - * 默认值: 基于系统时间生成的36进制字符串 - */ - private String clientId = Long.toString(System.currentTimeMillis(), 36); - - /** - * 服务器URL - * 默认值: tcp://127.0.0.1:1883 - */ - private String url = "tcp://127.0.0.1:1883"; - - /** - * 服务器集群URL列表(可选) - */ - private List urls = new ArrayList<>(); - - /** - * 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 - * 设置为true表示每次连接服务器都是以新的身份 - * 默认值: true - */ - private Boolean cleanSession = true; - - /** - * 用户名 - * 默认值: admin - */ - private String username = "admin"; - - /** - * 密码 - * 默认值: 123456 - */ - private String password = "123456"; - - /** - * 连接超时时间,单位秒 - * 默认值: 5 - */ - private Integer connectionTimeout = 5; - - /** - * 心跳时间 单位为秒,表示服务器每隔60秒的时间向客户端发送心跳判断客户端是否在线 - * 默认值: 60 - */ - private Integer keepAliveInterval = 60; - - /** - * 遗嘱主题 - * 默认值: will/topic - */ - private String willTopic = "will/topic"; - - /** - * 遗嘱消息 - * 默认值: offline - */ - private String willMessage = "offline"; - - /** - * 遗嘱消息的服务质量(QoS) - * 默认值: 0 - */ - private Integer willQos = 0; - - /** - * 订阅主题列表 - */ - private List subscribes = new ArrayList<>(); - - /** - * 是否自动重连 - * 默认值: true (注释掉的配置) - */ -// private Boolean automaticReconnect = true; - -// private Integer maxReconnectDelay = Integer.MAX_VALUE; - - /** - * 自定义WebSocket头部信息 - */ - private Properties customWebSocketHeaders = new Properties(); - - /** - * SSL属性配置 - */ - private Properties sslProperties = new Properties(); - - /** - * 发送主题映射,key为发送名称,value为具体的主题配置 - */ - private Map sends = new HashMap<>(); - - /** - * MQTT协议版本 - * 默认值: MQTT_VERSION_DEFAULT - */ - private MqttVersion mqttVersion = MqttVersion.MQTT_VERSION_DEFAULT; - - /** - * 最大未确认消息数量 - * 默认值: 10 - */ - private Integer maxInflight = 10; - } + private int[] qos; } diff --git a/ruoyi-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index f3a8b93..551c9aa 100644 --- a/ruoyi-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/ruoyi-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,2 +1 @@ com.ruoyi.mqtt.config.MqttConfig -com.ruoyi.mqtt.rmi.config.MqttRmiConfig diff --git a/ruoyi-system-cron/src/main/java/com/ruoyi/cron/event/CronTaskEvent.java b/ruoyi-system-cron/src/main/java/com/ruoyi/cron/event/CronTaskEvent.java index dc4ae42..bb88f62 100644 --- a/ruoyi-system-cron/src/main/java/com/ruoyi/cron/event/CronTaskEvent.java +++ b/ruoyi-system-cron/src/main/java/com/ruoyi/cron/event/CronTaskEvent.java @@ -1,8 +1,12 @@ package com.ruoyi.cron.event; +import cn.hutool.core.collection.ListUtil; +import com.ruoyi.common.utils.spring.SpringUtils; +import com.ruoyi.cron.vo.CronTaskVo; import lombok.Getter; import org.springframework.context.ApplicationEvent; +import java.lang.reflect.Method; import java.util.List; /** @@ -26,4 +30,16 @@ public class CronTaskEvent extends ApplicationEvent { this.paramELs = paramELs; this.cronId = cronId; } + + + + public static CronTaskEvent of(Method method, String... paramEls) { + String taskId = CronTaskVo.id(method); + return new CronTaskEvent(null,taskId,ListUtil.of(paramEls)); + } + + public static void publishEvent(Method method,String... paramEls) { + SpringUtils.publishEvent(of(method, paramEls)); + } + } diff --git a/ruoyi-system-cron/src/main/java/com/ruoyi/cron/runner/CronRunner.java b/ruoyi-system-cron/src/main/java/com/ruoyi/cron/runner/CronRunner.java index 6bc47b9..865756b 100644 --- a/ruoyi-system-cron/src/main/java/com/ruoyi/cron/runner/CronRunner.java +++ b/ruoyi-system-cron/src/main/java/com/ruoyi/cron/runner/CronRunner.java @@ -170,7 +170,7 @@ public class CronRunner implements ApplicationRunner { // 执行成功后,继续执行子任务 - if (ObjUtil.isNotNull(event.getCronId() != null)) { + if (ObjUtil.isNotNull(event.getCronId())) { List list = MongoUtil.find(CronTask.class, MongoUtil.conditions().put("pid", event.getCronId()).put("enabled", true)); if (CollUtil.isNotEmpty(list)) { list.forEach(cronTask -> {