diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseAction.java b/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseAction.java new file mode 100644 index 0000000..3484002 --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseAction.java @@ -0,0 +1,17 @@ +package com.ruoyi.common.sse; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Data +public class SseAction { + + private String action; + private T data; + +} diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseApiSupport.java b/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseApiSupport.java new file mode 100644 index 0000000..434d433 --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseApiSupport.java @@ -0,0 +1,116 @@ +package com.ruoyi.common.sse; + +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.ObjUtil; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.PathVariable; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public abstract class SseApiSupport { + protected final List> emitters = new CopyOnWriteArrayList<>(); + + /** + * 订阅某种类型的消息 + * + * @param timeout 会话超时 + * @param type 消息类型 + * @return + */ + protected final SseEmitter handleSubscribe(long timeout, T type) { + SseEmitter emitter = new SseEmitter(timeout, type); + emitters.add(emitter); + emitter.onTimeout(() -> { + emitters.remove(emitter); + }); + emitter.onCompletion(() -> { + emitters.remove(emitter); + }); + emitter.onError(e -> { + emitters.remove(emitter); + }); + return emitter; + } + + /** + * 订阅某种类型的消息 + * 超时:Long.MAX_VALUE + * + * @param type 消息类型 + * @return + */ + protected final SseEmitter handleSubscribe(T type) { + return handleSubscribe(Long.MAX_VALUE, type); + } + + /** + * 发送某种类型的action消息 + * + * @param type 消息类型 + * @param action action消息 + */ + public final void send(T type, SseAction action) { + List> removes = ListUtil.list(true); + for (SseEmitter e : emitters) { + if (ObjUtil.isNull(type) || ObjUtil.isNull(e.getType()) || ObjUtil.equals(type, e.getType())) { + try { + e.send(action, MediaType.APPLICATION_JSON); + } catch (Exception e1) { + removes.add(e); + } + } + } + emitters.removeAll(removes); + } + + /** + * 发送某种类型的action消息 + * + * @param type + * @param action + * @param data + */ + public final void send(T type, String action, Object data) { + send(type, SseAction.builder().action(action).data(data).build()); + } + + /** + * 发送某种类型的action=null的消息 + * + * @param type + * @param data + */ + public final void send(T type, Object data) { + send(type, SseAction.builder().data(data).build()); + } + + /** + * 发送null类型(广播)的action消息 + * + * @param action + */ + public final void sendAll(SseAction action) { + send(null, action); + } + + + /** + * 发送null类型(广播)的action消息 + * + * @param action + * @param data + */ + public final void sendAll(String action, Object data) { + sendAll(SseAction.builder().action(action).data(data).build()); + } + + /** + * 发送null类型(广播)的action=null的消息 + * + * @param data + */ + public final void sendAll(Object data) { + sendAll(SseAction.builder().data(data).build()); + } +} diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseEmitter.java b/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseEmitter.java new file mode 100644 index 0000000..82b39fb --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/sse/SseEmitter.java @@ -0,0 +1,28 @@ +package com.ruoyi.common.sse; + +import lombok.Getter; + +/** + * 带类型的SseEmitter + * @param + */ +public class SseEmitter extends org.springframework.web.servlet.mvc.method.annotation.SseEmitter { + /** + * 消息类型 + */ + @Getter + private T type; + + public SseEmitter(long timeout, T type) { + super(timeout); + this.type = type; + } + + public SseEmitter(T id) { + this(Long.MAX_VALUE, id); + } + + public SseEmitter() { + this(Long.MAX_VALUE, null); + } +}