update 带类型的SSE和带action的消息支持

master
管理员 12 months ago
parent 6822f0a7a8
commit 9ee30e5de3

@ -0,0 +1,17 @@
package com.ruoyi.common.sse;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class SseAction<T> {
private String action;
private T data;
}

@ -0,0 +1,116 @@
package com.ruoyi.common.sse;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjUtil;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PathVariable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public abstract class SseApiSupport<T> {
protected final List<SseEmitter<T>> emitters = new CopyOnWriteArrayList<>();
/**
*
*
* @param timeout
* @param type
* @return
*/
protected final SseEmitter<T> handleSubscribe(long timeout, T type) {
SseEmitter<T> emitter = new SseEmitter(timeout, type);
emitters.add(emitter);
emitter.onTimeout(() -> {
emitters.remove(emitter);
});
emitter.onCompletion(() -> {
emitters.remove(emitter);
});
emitter.onError(e -> {
emitters.remove(emitter);
});
return emitter;
}
/**
*
* Long.MAX_VALUE
*
* @param type
* @return
*/
protected final SseEmitter<T> handleSubscribe(T type) {
return handleSubscribe(Long.MAX_VALUE, type);
}
/**
* action
*
* @param type
* @param action action
*/
public final void send(T type, SseAction<?> action) {
List<SseEmitter<T>> removes = ListUtil.list(true);
for (SseEmitter<T> e : emitters) {
if (ObjUtil.isNull(type) || ObjUtil.isNull(e.getType()) || ObjUtil.equals(type, e.getType())) {
try {
e.send(action, MediaType.APPLICATION_JSON);
} catch (Exception e1) {
removes.add(e);
}
}
}
emitters.removeAll(removes);
}
/**
* action
*
* @param type
* @param action
* @param data
*/
public final void send(T type, String action, Object data) {
send(type, SseAction.builder().action(action).data(data).build());
}
/**
* action=null
*
* @param type
* @param data
*/
public final void send(T type, Object data) {
send(type, SseAction.builder().data(data).build());
}
/**
* null(广)action
*
* @param action
*/
public final void sendAll(SseAction<?> action) {
send(null, action);
}
/**
* null(广)action
*
* @param action
* @param data
*/
public final void sendAll(String action, Object data) {
sendAll(SseAction.builder().action(action).data(data).build());
}
/**
* null(广)action=null
*
* @param data
*/
public final void sendAll(Object data) {
sendAll(SseAction.builder().data(data).build());
}
}

@ -0,0 +1,28 @@
package com.ruoyi.common.sse;
import lombok.Getter;
/**
* SseEmitter
* @param <T>
*/
public class SseEmitter<T> extends org.springframework.web.servlet.mvc.method.annotation.SseEmitter {
/**
*
*/
@Getter
private T type;
public SseEmitter(long timeout, T type) {
super(timeout);
this.type = type;
}
public SseEmitter(T id) {
this(Long.MAX_VALUE, id);
}
public SseEmitter() {
this(Long.MAX_VALUE, null);
}
}
Loading…
Cancel
Save