update 使用SSE重写实时定时任务日志输出

master
管理员 12 months ago
parent 9ee30e5de3
commit a3a418ad1b

@ -14,6 +14,7 @@
"url": "https://gitee.com/JavaLionLi/RuoYi-Vue-Plus-UI.git" "url": "https://gitee.com/JavaLionLi/RuoYi-Vue-Plus-UI.git"
}, },
"dependencies": { "dependencies": {
"@microsoft/fetch-event-source": "2.0.1",
"@element-plus/icons-vue": "2.3.1", "@element-plus/icons-vue": "2.3.1",
"@vueup/vue-quill": "1.1.0", "@vueup/vue-quill": "1.1.0",
"@vueuse/core": "9.5.0", "@vueuse/core": "9.5.0",

@ -11,6 +11,9 @@ importers:
'@element-plus/icons-vue': '@element-plus/icons-vue':
specifier: 2.3.1 specifier: 2.3.1
version: 2.3.1(vue@3.5.13) version: 2.3.1(vue@3.5.13)
'@microsoft/fetch-event-source':
specifier: 2.0.1
version: 2.0.1
'@vueup/vue-quill': '@vueup/vue-quill':
specifier: 1.1.0 specifier: 1.1.0
version: 1.1.0(vue@3.5.13) version: 1.1.0(vue@3.5.13)
@ -278,6 +281,9 @@ packages:
'@jridgewell/sourcemap-codec@1.5.0': '@jridgewell/sourcemap-codec@1.5.0':
resolution: {integrity: sha512-gv3ZRaISU3fjPAgNsriBRqGWQL6quFx04YMPW/zD8XMLsU32mhCCbfbO6KZFLjvYpCZ8zyDEgqsgf+PwPaM7GQ==} resolution: {integrity: sha512-gv3ZRaISU3fjPAgNsriBRqGWQL6quFx04YMPW/zD8XMLsU32mhCCbfbO6KZFLjvYpCZ8zyDEgqsgf+PwPaM7GQ==}
'@microsoft/fetch-event-source@2.0.1':
resolution: {integrity: sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==}
'@nodelib/fs.scandir@2.1.5': '@nodelib/fs.scandir@2.1.5':
resolution: {integrity: sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==} resolution: {integrity: sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==}
engines: {node: '>= 8'} engines: {node: '>= 8'}
@ -1943,6 +1949,8 @@ snapshots:
'@jridgewell/sourcemap-codec@1.5.0': {} '@jridgewell/sourcemap-codec@1.5.0': {}
'@microsoft/fetch-event-source@2.0.1': {}
'@nodelib/fs.scandir@2.1.5': '@nodelib/fs.scandir@2.1.5':
dependencies: dependencies:
'@nodelib/fs.stat': 2.0.5 '@nodelib/fs.stat': 2.0.5

@ -0,0 +1,173 @@
import { fetchEventSource } from '@microsoft/fetch-event-source'
import { getToken } from '@/utils/auth'
/**
* @description: 创建sse连接
*/
class ServerSentEvents {
static defaultConfig = {
base: import.meta.env.VITE_APP_BASE_API, // 基础地址
url: '/sse', // 地址
data: undefined, // 请求正文
params: undefined, // 请求参数
method: 'get', // 提交方式
auth: true, // 是否携带token
json: true, // 是否返回json
returnData: false, // json数据是否返回data属性
reconnect: true, //是否重连
headers: {
'Content-Type': 'application/json'
},
onopen: () => { },
onmessage: () => { },
onerror: () => { },
onclose: () => { }
}
constructor(config) {
if (config) {
this.setConfig(config)
this.init()
}
}
static get(url, onmessage, config = {}) {
config.onmessage = onmessage
config.url = url
return new ServerSentEvents(config)
}
static post(url, data, onmessage, config = {}) {
config.onmessage = onmessage
config.url = url
config.method = 'post'
config.data = data
return new ServerSentEvents(config)
}
setConfig(config) {
this.config = {
ctrl: new AbortController(),
...ServerSentEvents.defaultConfig,
...config
}
console.debug('this.config', this.config)
}
init() {
if (this.config.auth) {
this.config.headers.Authorization = 'Bearer ' + getToken()
}
let url = this.config.url
// 如果url不含协议
if (url.indexOf("//") == -1) {
url = this.config.base + url
}
if (this.config.params) {
if (url.indexOf("?") > -1) {
url += '&' + this.params(this.config.params)
} else {
url += '?' + this.params(this.config.params)
}
}
let body = undefined
if (this.config.data && (this.config.method === 'post' || this.config.method === 'put')) {
if (this.config.data.constructor == URLSearchParams) {
this.config.headers['Content-Type'] = 'application/x-www-form-urlencoded'
body = this.params(this.config.data).toString()
} else if (this.config.data.constructor == FormData) {
this.config.headers['Content-Type'] = 'multipart/form-data'
body = this.config.data
} else {
body = JSON.stringify(body)
}
}
this.config._url = url
this.config._body = body
console.debug(this.config)
this.send()
}
send() {
fetchEventSource(this.config._url, {
method: this.config.method,
headers: this.config.headers,
body: this.config._body,
signal: this.config.ctrl.signal,
onopen: this.config.onopen,
onmessage: (msg) => {
if (this.config.json) {
let data = JSON.parse(msg.data)
if (this.config.returnData) {
data = data.data
}
this.config.onmessage(data)
} else {
this.config.onmessage(msg)
}
},
onclose: () => {
console.info('onclose')
this.abort()
this.config.onclose()
if (this.config.reconnect) {
this.send()
}
},
onerror: (err) => {
console.error(err)
this.abort()
this.config.onerror(err)
}
})
}
abort() {
if (this.config.ctrl && !this.config.reconnect) {
try {
this.config.ctrl.abort()
} catch (e) {
console.error(e)
}
}
}
close() {
this.abort()
}
params(param) {
if (param == null || param == "") {
return new URLSearchParams();
}
if (param.constructor == Array) {
let param1 = new URLSearchParams();
for (let obj of param) {
param1.append(obj.name, obj.value);
}
param = param1;
} else if (param.constructor == Object) {
let param1 = new URLSearchParams();
for (let name in param) {
param1.append(name, param[name]);
}
param = param1;
} else {
if (param.constructor == HTMLFormElement) {
param = new FormData(param);
}
if (param.constructor == FormData || param.constructor == String) {
param = new URLSearchParams(param);
}
}
return param;
}
}
export default ServerSentEvents

@ -29,10 +29,10 @@
<el-switch v-model="row.enabled" @change="handleEnabled(row)" /> <el-switch v-model="row.enabled" @change="handleEnabled(row)" />
</template> </template>
</el-table-column> </el-table-column>
<el-table-column label="操作" fixed="right" width="200" align="center"> <el-table-column label="操作" fixed="right" width="220" align="center">
<template #default="{ row }"> <template #default="{ row }">
<el-button-group> <el-button-group>
<el-button type="default" link @click="proxy.$refs.runRef.open(taskList, row);">运行</el-button> <el-button type="success" link @click="proxy.$refs.runRef.open(taskList, row);">手动运行</el-button>
<el-button type="default" link @click="subCron(row);"></el-button> <el-button type="default" link @click="subCron(row);"></el-button>
<el-button type="primary" link @click="proxy.$refs.editRef.open(taskList, row);">修改</el-button> <el-button type="primary" link @click="proxy.$refs.editRef.open(taskList, row);">修改</el-button>
<el-button type="danger" link @click="handleDelete(row)"></el-button> <el-button type="danger" link @click="handleDelete(row)"></el-button>

@ -1,4 +1,5 @@
import request from '@/utils/request' import request from '@/utils/request'
import SSE from '@/api/SSE'
const base = '/system/cron/' const base = '/system/cron/'
@ -18,3 +19,5 @@ export const logPage = (params) => request.get(base + 'log-page', { params })
export const current = (group = 0) => request.get(base + 'current-' + group) export const current = (group = 0) => request.get(base + 'current-' + group)
export const start = (cronId, taskId, paramEls) => request.post(base + "/start", request.params({ cronId, taskId, paramEls })) export const start = (cronId, taskId, paramEls) => request.post(base + "/start", request.params({ cronId, taskId, paramEls }))
export const doSSE = (groupId = 0, callback) => SSE.post(base + 'sse-' + groupId, {}, callback, { returnData: false })

@ -1,26 +1,61 @@
<template> <template>
<div class="xf-root"> <div class="xf-root">
<div class="search-form"> <div class="search-form">
<el-select
<el-select v-model="search.taskId" style="width: 10em; display: block;" clearable placeholder="任务"> v-model="search.taskId"
<el-option v-for="item in taskList" :key="item.id" :label="item.name" :value="item.id" /> style="width: 10em; display: block"
clearable
placeholder="任务"
>
<el-option
v-for="item in taskList"
:key="item.id"
:label="item.name"
:value="item.id"
/>
</el-select> </el-select>
<el-select v-model="search.ok" style="width: 6em; display: block;" clearable placeholder="成败"> <el-select
v-model="search.ok"
style="width: 6em; display: block"
clearable
placeholder="成败"
>
<el-option label="成功" :value="true" /> <el-option label="成功" :value="true" />
<el-option label="失败" :value="false" /> <el-option label="失败" :value="false" />
</el-select> </el-select>
<div style="width: 20em;"><el-date-picker v-model="daterange" value-format="YYYY-MM-DD" type="daterange" <div style="width: 20em">
range-separator="至" start-placeholder="开始日期" end-placeholder="结束日期" /></div> <el-date-picker
v-model="daterange"
value-format="YYYY-MM-DD"
type="daterange"
range-separator="至"
start-placeholder="开始日期"
end-placeholder="结束日期"
/>
</div>
<el-button type="default" @click="handleSearch"></el-button> <el-button type="default" @click="handleSearch"></el-button>
<el-badge :value="cList.length" style="margin-left: auto; margin-right: 1em;"> <el-badge
<el-button :disabled="cList.length == 0" type="primary" plain @click="cShow = true">运行中</el-button> :value="cList.length"
style="margin-left: auto; margin-right: 1em"
>
<el-button
:disabled="cList.length == 0"
type="primary"
plain
@click="cShow = true"
>运行中</el-button
>
</el-badge> </el-badge>
<el-button type="primary" @click="proxy.$refs.cronListRef.open(taskList, query.groupId);">定时任务</el-button> <el-button
type="primary"
@click="proxy.$refs.cronListRef.open(taskList, query.groupId)"
>定时任务</el-button
>
</div> </div>
<el-table :data="list" border stripe height="100%" v-loading="loading"> <el-table :data="list" border stripe height="100%" v-loading="loading">
<el-table-column label="任务名称" align="left"> <el-table-column label="任务名称" align="left">
<template #default="{ row }"> <template #default="{ row }">
{{ taskList.find(a => a.id == row.taskId)?.name || '未知任务' }} {{ taskList.find((a) => a.id == row.taskId)?.name || "未知任务" }}
</template> </template>
</el-table-column> </el-table-column>
<el-table-column label="执行结果" align="left"> <el-table-column label="执行结果" align="left">
@ -30,7 +65,7 @@
<template #reference> <template #reference>
<el-button type="success" link>执行成功</el-button> <el-button type="success" link>执行成功</el-button>
</template> </template>
<div style="width: 100%; height: 400px; overflow: auto;"> <div style="width: 100%; height: 400px; overflow: auto">
<pre>{{ row.ex }}</pre> <pre>{{ row.ex }}</pre>
</div> </div>
</el-popover> </el-popover>
@ -38,9 +73,11 @@
<template v-else> <template v-else>
<el-popover placement="bottom" :width="800" trigger="hover"> <el-popover placement="bottom" :width="800" trigger="hover">
<template #reference> <template #reference>
<el-button type="danger" link>{{ row.msg || '未知异常' }}</el-button> <el-button type="danger" link>{{
row.msg || "未知异常"
}}</el-button>
</template> </template>
<div style="width: 100%; height: 400px; overflow: auto;"> <div style="width: 100%; height: 400px; overflow: auto">
<pre>{{ row.ex }}</pre> <pre>{{ row.ex }}</pre>
</div> </div>
</el-popover> </el-popover>
@ -49,15 +86,13 @@
</el-table-column> </el-table-column>
<el-table-column label="参数" align="center" width="100"> <el-table-column label="参数" align="center" width="100">
<template #default="{ row }"> <template #default="{ row }">
<template v-if="!row.params"> <template v-if="!row.params"> </template>
</template>
<template v-else> <template v-else>
<el-popover placement="bottom" :width="300" trigger="hover"> <el-popover placement="bottom" :width="300" trigger="hover">
<template #reference> <template #reference>
{{ JSON.parse(row.params).length }}个参数 {{ JSON.parse(row.params).length }}个参数
</template> </template>
<div style="width: 100%; height: 200px; overflow: auto;"> <div style="width: 100%; height: 200px; overflow: auto">
<pre>{{ JSON.stringify(JSON.parse(row.params), null, 2) }}</pre> <pre>{{ JSON.stringify(JSON.parse(row.params), null, 2) }}</pre>
</div> </div>
</el-popover> </el-popover>
@ -65,36 +100,70 @@
</template> </template>
</el-table-column> </el-table-column>
<el-table-column
label="启动时间"
<el-table-column label="启动时间" align="center" width="170" prop="startTime" /> align="center"
width="170"
prop="startTime"
/>
<el-table-column label="耗时" align="center" width="100"> <el-table-column label="耗时" align="center" width="100">
<template #default="{ row }"> <template #default="{ row }">
{{ times(row) }} {{ times(row) }}
</template> </template>
</el-table-column> </el-table-column>
</el-table> </el-table>
<pagination v-show="total > 0" :page-sizes="[10, 20, 50]" :total="total" v-model:page="query.current" <pagination
v-model:limit="query.size" @pagination="loadPage" /> v-show="total > 0"
:page-sizes="[10, 20, 50]"
:total="total"
v-model:page="query.current"
v-model:limit="query.size"
@pagination="loadPage"
/>
<w-cron-list ref="cronListRef" /> <w-cron-list ref="cronListRef" />
<el-drawer direction="rtl" size="40%" title="正在运行的任务" v-model="cShow"> <el-drawer
direction="rtl"
size="40%"
title="正在运行的任务"
v-model="cShow"
>
<el-collapse :modelValue="0" accordion> <el-collapse :modelValue="0" accordion>
<el-collapse-item v-for="row in cList" :key="row.id"> <el-collapse-item v-for="row in cList" :key="row.id">
<template #title> <template #title>
<div style="display: grid; width: 100%; grid-template-columns: 2fr 1fr 1fr; "> <div
<div style="font-weight: bold; text-align: left;"> {{ taskList.find(a => a.id == row.taskId)?.name || style="
'未知任务' }}</div> display: grid;
<div>{{ row.startTime.toDate("yyyy-MM-dd HH:mm:ss").format("HH:mm:ss") }}</div> width: 100%;
<div style="display: flex; align-items: center; justify-content: center;"> grid-template-columns: 2fr 1fr 1fr;
<div class="w-loading" style="margin-right: .5em;"></div> 运行中 "
>
<div style="font-weight: bold; text-align: left">
{{
taskList.find((a) => a.id == row.taskId)?.name || "未知任务"
}}
</div>
<div>
{{
row.startTime.toDate("yyyy-MM-dd HH:mm:ss").format("HH:mm:ss")
}}
</div>
<div
style="
display: flex;
align-items: center;
justify-content: center;
"
>
<div class="w-loading" style="margin-right: 0.5em"></div>
运行中
</div> </div>
</div> </div>
</template> </template>
<div style="width: 100%; height: 400px; overflow: auto;"> <div style="width: 100%; height: 400px; overflow: auto">
<pre>{{ row.ex }}</pre> <pre>{{ row.ex }}</pre>
</div> </div>
</el-collapse-item> </el-collapse-item>
@ -103,15 +172,23 @@
</div> </div>
</template> </template>
<script setup> <script setup>
import WCronList from './components/WCronList.vue' import WCronList from "./components/WCronList.vue";
import { ref, getCurrentInstance, watch, onMounted, computed, onUnmounted } from 'vue' import {
import { ElLoading, ElMessage } from 'element-plus' ref,
import * as api from './cron' getCurrentInstance,
watch,
onMounted,
computed,
onUnmounted,
} from "vue";
import { ElLoading, ElMessage } from "element-plus";
import * as api from "./cron";
const daterange = computed({ const daterange = computed({
get() { get() {
return search.value.beginDate ? [search.value.beginDate, search.value.endDate] : [] return search.value.beginDate
? [search.value.beginDate, search.value.endDate]
: [];
}, },
set(value) { set(value) {
if (value && value[0]) { if (value && value[0]) {
@ -121,13 +198,17 @@ const daterange = computed({
search.value.beginDate = ""; search.value.beginDate = "";
search.value.endDate = ""; search.value.endDate = "";
} }
} },
});
})
const { proxy } = getCurrentInstance() const { proxy } = getCurrentInstance();
const times = (a) => ((a.endTime.toDate("yyyy-MM-dd HH:mm:ss") - a.startTime.toDate("yyyy-MM-dd HH:mm:ss")) / 1000).toTime() const times = (a) =>
(
(a.endTime.toDate("yyyy-MM-dd HH:mm:ss") -
a.startTime.toDate("yyyy-MM-dd HH:mm:ss")) /
1000
).toTime();
// //
const search = ref({}); const search = ref({});
@ -135,7 +216,7 @@ const search = ref({});
const query = ref({ const query = ref({
current: 1, current: 1,
size: 10, size: 10,
orderBy: 'endTime desc' orderBy: "endTime desc",
}); });
// //
@ -153,57 +234,60 @@ const loadPage = async () => {
list.value = r.data?.records || []; list.value = r.data?.records || [];
total.value = Number(r.data?.total || 0); total.value = Number(r.data?.total || 0);
loading.value = false; loading.value = false;
} };
// //
const handleSearch = () => { const handleSearch = () => {
query.value = { ...query.value, ...search.value }; query.value = { ...query.value, ...search.value };
query.value.current = 1; query.value.current = 1;
loadPage(); loadPage();
} };
const taskList = ref([]);
const taskList = ref([])
const init = async () => { const init = async () => {
let r = await api.list(proxy.$route.query.group || 0) let r = await api.list(proxy.$route.query.group || 0);
taskList.value = r.data taskList.value = r.data;
query.value.groupId = proxy.$route.query.group || 0
handleSearch()
}
query.value.groupId = proxy.$route.query.group || 0;
handleSearch();
};
const cList = ref([]) const cList = ref([]);
const cShow = ref(false) const cShow = ref(false);
watch( watch(
() => cList.value.length, () => { () => cList.value.length,
() => {
handleSearch(); handleSearch();
if (cList.value.length == 0) { if (cList.value.length == 0) {
cShow.value = false; cShow.value = false;
} }
} }
) );
let sse = null;
let timer = null;
onMounted(async () => { onMounted(async () => {
await proxy.$nextTick() await proxy.$nextTick();
await init() await init();
timer = window.setInterval(async () => { let r = await api.current(query.value.groupId);
let r = await api.current(query.value.groupId) cList.value = r.data;
cList.value = r.data sse = api.doSSE(query.value.groupId, (data) => {
}, 500); if (data.action == "list") {
}) cList.value = data.data;
} else if (data.action == "log") {
let item = cList.value.find((item) => item.id == data.data.logId);
if (item && data.data.ex) {
item.ex += "\n" + data.data.ex;
}
}
});
});
onUnmounted(() => { onUnmounted(() => {
window.clearInterval(timer) sse.close();
}) });
</script> </script>
<style lang="scss" scoped> <style lang="scss" scoped>
.xf-root { .xf-root {
@ -213,11 +297,11 @@ onUnmounted(() => {
display: flex; display: flex;
flex-wrap: wrap; flex-wrap: wrap;
align-items: center; align-items: center;
padding: 0 0 .3rem 0; padding: 0 0 0.3rem 0;
position: relative; position: relative;
&>* { & > * {
margin: 0 .5rem .5rem 0; margin: 0 0.5rem 0.5rem 0;
display: flex; display: flex;
align-items: center; align-items: center;
} }

@ -26,3 +26,10 @@ values(@id+4, '定时任务删除', @id, '4', '#', '', 1, 0, 'F', '0', '0', 'sy
``` ```
# 前端依赖
```sh
pnpm i @microsoft/fetch-event-source@2.0.1
```
src/api/SSE.js

Binary file not shown.

@ -11,37 +11,33 @@ import com.baomidou.lock.annotation.Lock4j;
import com.ruoyi.common.annotation.Dev; import com.ruoyi.common.annotation.Dev;
import com.ruoyi.common.annotation.Log; import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.domain.Page; import com.ruoyi.common.core.domain.Page;
import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.common.core.validate.AddGroup; import com.ruoyi.common.core.validate.AddGroup;
import com.ruoyi.common.core.validate.EditGroup; import com.ruoyi.common.core.validate.EditGroup;
import com.ruoyi.common.enums.BusinessType; import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.helper.LoginHelper; import com.ruoyi.common.helper.LoginHelper;
import com.ruoyi.common.sse.SseApiSupport;
import com.ruoyi.common.sse.SseEmitter;
import com.ruoyi.common.utils.IdUtils; import com.ruoyi.common.utils.IdUtils;
import com.ruoyi.common.utils.spring.SpringUtils; import com.ruoyi.common.utils.spring.SpringUtils;
import com.ruoyi.cron.document.CronTask; import com.ruoyi.cron.document.CronTask;
import com.ruoyi.cron.document.CronTaskLog; import com.ruoyi.cron.document.CronTaskLog;
import com.ruoyi.cron.event.CronTaskChangeEvent; import com.ruoyi.cron.event.*;
import com.ruoyi.cron.event.CronTaskEvent;
import com.ruoyi.cron.event.CronTaskLogEvent;
import com.ruoyi.cron.query.CronTaskLogQuery; import com.ruoyi.cron.query.CronTaskLogQuery;
import com.ruoyi.cron.runner.CronBeanPostProcessor; import com.ruoyi.cron.runner.CronBeanPostProcessor;
import com.ruoyi.cron.runner.CronRunner; import com.ruoyi.cron.runner.CronRunner;
import com.ruoyi.cron.task.CronTaskTestTask; import com.ruoyi.cron.task.CronTaskTestTask;
import com.ruoyi.cron.vo.CronTaskVo; import com.ruoyi.cron.vo.CronTaskVo;
import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.List; import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.ruoyi.common.utils.MongoUtil.*; import static com.ruoyi.common.utils.MongoUtil.*;
@ -49,7 +45,8 @@ import static com.ruoyi.common.utils.MongoUtil.*;
@RestController @RestController
@RequiredArgsConstructor @RequiredArgsConstructor
@RequestMapping("/system/cron/") @RequestMapping("/system/cron/")
public class CronTaskApi { @Validated
public class CronTaskApi extends SseApiSupport<Integer> {
private final MongoTemplate template; private final MongoTemplate template;
@ -100,59 +97,26 @@ public class CronTaskApi {
} }
private final static List<MySseEmitter> emitters = new CopyOnWriteArrayList<>(); @PostMapping("/sse-{groupId}")
public SseEmitter subscribeGroup(@PathVariable @NotNull(message = "分组号不能为空") Integer groupId) {
public static class MySseEmitter extends SseEmitter { return handleSubscribe(groupId);
@Getter
private Integer id;
public MySseEmitter(long timeout, Integer id) {
super(timeout);
this.id = id;
}
}
/**
*
*
* @param id 0:
* @return
*/
@GetMapping("/sse-{id}")
@SaIgnore
public SseEmitter subscribe(@PathVariable Integer id) {
if (id == null) {
id = 0;
}
MySseEmitter emitter = new MySseEmitter(Long.MAX_VALUE, id);
emitters.add(emitter);
emitter.onTimeout(() -> {
emitters.remove(emitter);
});
emitter.onCompletion(() -> {
emitters.remove(emitter);
});
emitter.onError(e -> {
emitters.remove(emitter);
});
return emitter;
} }
@EventListener @EventListener
@Async @Async
public void listenter(CronTaskLogEvent event) { public void listenter(CronTaskLogEvent event) {
List<MySseEmitter> es = ListUtil.list(true); send(event.getGroupId(), "log", event);
for (MySseEmitter e : emitters) {
if (!event.getGroupId().equals(e.getId())) {
continue;
}
try {
e.send(event);
} catch (Exception e1) {
es.add(e);
} }
@EventListener
public void listenter(CronTaskStartEvent event) {
send(event.getGroupId(), "list", runner.getCurrentLogs().stream().filter(l -> l.getGroupId() == event.getGroupId()).collect(Collectors.toList()));
} }
emitters.removeAll(es);
@EventListener
@Async
public void listenter(CronTaskEndEvent event) {
send(event.getGroupId(), "list", runner.getCurrentLogs().stream().filter(l -> l.getGroupId() == event.getGroupId()).collect(Collectors.toList()));
} }
@PostMapping @PostMapping

@ -2,6 +2,7 @@ package com.ruoyi.cron.document;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import com.ruoyi.common.core.validate.AddGroup;
import com.ruoyi.common.core.validate.EditGroup; import com.ruoyi.common.core.validate.EditGroup;
import lombok.Data; import lombok.Data;
import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Id;
@ -24,6 +25,7 @@ public class CronTask {
private Long id; private Long id;
@Indexed @Indexed
@NotNull(message = "请选择任务" , groups = {AddGroup.class})
private String taskId; private String taskId;
@Indexed @Indexed

@ -4,6 +4,9 @@ import com.ruoyi.cron.document.CronTask;
import lombok.Getter; import lombok.Getter;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
/**
*
*/
public class CronTaskChangeEvent extends ApplicationEvent { public class CronTaskChangeEvent extends ApplicationEvent {
@Getter @Getter

@ -0,0 +1,27 @@
package com.ruoyi.cron.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
*
*/
public class CronTaskEndEvent extends ApplicationEvent {
@Getter
private String taskId;
@Getter
private Long cronId;
@Getter
private Integer groupId;
public CronTaskEndEvent(Long cronId, String taskId, Integer groupId) {
super(taskId);
this.taskId = taskId;
this.groupId = groupId;
this.cronId = cronId;
}
}

@ -5,6 +5,9 @@ import org.springframework.context.ApplicationEvent;
import java.util.List; import java.util.List;
/**
*
*/
public class CronTaskEvent extends ApplicationEvent { public class CronTaskEvent extends ApplicationEvent {
@Getter @Getter
@ -16,6 +19,7 @@ public class CronTaskEvent extends ApplicationEvent {
@Getter @Getter
private Long cronId; private Long cronId;
public CronTaskEvent(Long cronId, String taskId, List<String> paramELs) { public CronTaskEvent(Long cronId, String taskId, List<String> paramELs) {
super(taskId); super(taskId);
this.taskId = taskId; this.taskId = taskId;

@ -1,26 +1,33 @@
package com.ruoyi.cron.event; package com.ruoyi.cron.event;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter; import lombok.Getter;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import java.time.Clock; import java.time.Clock;
/**
*
*/
@Getter @Getter
public class CronTaskLogEvent extends ApplicationEvent { public class CronTaskLogEvent extends ApplicationEvent {
private Long logId;
private String taskId; private String taskId;
private Integer groupId; private Integer groupId;
private String ex; private String ex;
public CronTaskLogEvent(String taskId, Integer groupId, String ex) { public CronTaskLogEvent(Long logId,String taskId, Integer groupId, String ex) {
super(groupId); super(groupId);
this.logId = logId;
this.taskId = taskId; this.taskId = taskId;
this.groupId = groupId; this.groupId = groupId;
this.ex = ex; this.ex = ex;
} }
@Override @Override
@JsonIgnore
public Integer getSource() { public Integer getSource() {
return (Integer) super.getSource(); return (Integer) super.getSource();
} }

@ -0,0 +1,29 @@
package com.ruoyi.cron.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import java.util.List;
/**
*
*/
public class CronTaskStartEvent extends ApplicationEvent {
@Getter
private String taskId;
@Getter
private Long cronId;
@Getter
private Integer groupId;
public CronTaskStartEvent(Long cronId, String taskId, Integer groupId) {
super(taskId);
this.taskId = taskId;
this.groupId = groupId;
this.cronId = cronId;
}
}

@ -14,7 +14,9 @@ import com.ruoyi.common.utils.spring.SpringUtils;
import com.ruoyi.cron.document.CronTask; import com.ruoyi.cron.document.CronTask;
import com.ruoyi.cron.document.CronTaskLog; import com.ruoyi.cron.document.CronTaskLog;
import com.ruoyi.cron.event.CronTaskChangeEvent; import com.ruoyi.cron.event.CronTaskChangeEvent;
import com.ruoyi.cron.event.CronTaskEndEvent;
import com.ruoyi.cron.event.CronTaskEvent; import com.ruoyi.cron.event.CronTaskEvent;
import com.ruoyi.cron.event.CronTaskStartEvent;
import com.ruoyi.cron.vo.CronTaskVo; import com.ruoyi.cron.vo.CronTaskVo;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -160,6 +162,7 @@ public class CronRunner implements ApplicationRunner {
taskLog.setParams(JsonUtils.toJsonString(param)); taskLog.setParams(JsonUtils.toJsonString(param));
currentLogs.add(taskLog); currentLogs.add(taskLog);
log.info("开始执行任务:" + vo.getName() + "\t参数" + taskLog.getParams()); log.info("开始执行任务:" + vo.getName() + "\t参数" + taskLog.getParams());
SpringUtils.publishEvent(new CronTaskStartEvent(event.getCronId(), vo.getId(),vo.getGroup()));
vo.getMethod().invoke(vo.getBean(), param); vo.getMethod().invoke(vo.getBean(), param);
taskLog.setOk(true); taskLog.setOk(true);
@ -190,6 +193,7 @@ public class CronRunner implements ApplicationRunner {
TaskLogImpl.logHolder.remove(); TaskLogImpl.logHolder.remove();
} }
taskLog.setEndTime(new Date()); taskLog.setEndTime(new Date());
SpringUtils.publishEvent(new CronTaskEndEvent(event.getCronId(), vo.getId(),vo.getGroup()));
currentLogs.remove(taskLog); currentLogs.remove(taskLog);
template.insert(taskLog); template.insert(taskLog);

@ -26,6 +26,6 @@ public class TaskLogImpl implements TaskLog {
} else { } else {
logHolder.get().setEx(logHolder.get().getEx() + "\n" + ex); logHolder.get().setEx(logHolder.get().getEx() + "\n" + ex);
} }
SpringUtils.publishEvent(new CronTaskLogEvent(logHolder.get().getTaskId(),logHolder.get().getGroupId(),ex)); SpringUtils.publishEvent(new CronTaskLogEvent(logHolder.get().getId(),logHolder.get().getTaskId(),logHolder.get().getGroupId(),ex));
} }
} }

Loading…
Cancel
Save