You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
7.3 KiB
Java

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.ruoyi.cron.runner;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import com.ruoyi.common.utils.IdUtils;
import com.ruoyi.common.utils.JsonUtils;
import com.ruoyi.common.utils.MongoUtil;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.ruoyi.cron.document.CronTask;
import com.ruoyi.cron.document.CronTaskLog;
import com.ruoyi.cron.event.CronTaskChangeEvent;
import com.ruoyi.cron.event.CronTaskEndEvent;
import com.ruoyi.cron.event.CronTaskEvent;
import com.ruoyi.cron.event.CronTaskStartEvent;
import com.ruoyi.cron.vo.CronTaskVo;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
@Configuration
@Slf4j
@RequiredArgsConstructor
public class CronRunner implements ApplicationRunner {
private final CronBeanPostProcessor processor;
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
private final MongoTemplate template;
private final ApplicationContext act;
private final ExpressionParser parser = new SpelExpressionParser();
@Getter
private final Map<Long, ScheduledFuture<?>> cronMap = MapUtil.newHashMap();
@Getter
private final List<CronTaskLog> currentLogs = new CopyOnWriteArrayList<>();
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("本地定时模块启动成功");
ListUtil.sort(processor.getList(), (o2, o1) -> o1.getOrder() - o2.getOrder());
if (log.isDebugEnabled()) {
log.debug("可用的本地定时任务列表: {}", JsonUtils.toPrettyString(processor.getList()));
}
template.findAll(CronTask.class).forEach(cronTask -> {
addCronTask(cronTask);
});
}
private void addCronTask(CronTask cronTask) {
//未激活不定时
if (!cronTask.getEnabled()) {
return;
}
//手动任务不定时
if (StrUtil.isBlank(cronTask.getCron())) {
return;
}
CronTrigger trigger = null;
try {
trigger = new CronTrigger(cronTask.getCron());
} catch (Exception ex) {
log.warn("本地定时任务cron表达式错误,定时任务编号:" + cronTask.getId(), ex);
return;
}
final CronTaskVo vo = processor.getList().stream().filter(bean -> bean.getId().equals(cronTask.getTaskId())).findFirst().orElse(null);
if (ObjUtil.isNull(vo)) {
log.warn("未发现任务:" + cronTask.getTaskId());
return;
}
cronMap.put(cronTask.getId(), threadPoolTaskScheduler.schedule(() -> {
SpringUtils.publishEvent(new CronTaskEvent(cronTask.getId(), cronTask.getTaskId(), cronTask.getParamELs()));
}, trigger));
log.info("本地定时任务启动成功,定时任务编号:{},cron:{}", cronTask.getId(), cronTask.getCron());
}
@EventListener
@Async
public void onCronTaskChange(CronTaskChangeEvent event) {
CronTask cronTask = event.getSource();
if (event.getChangeType() == CronTaskChangeEvent.ChangeType.ADD) {
addCronTask(cronTask);
} else if (event.getChangeType() == CronTaskChangeEvent.ChangeType.DELETE) {
if (cronMap.containsKey(cronTask.getId())) {
cronMap.get(cronTask.getId()).cancel(true);
cronMap.remove(cronTask.getId());
log.info("本地定时任务停止成功,定时任务编号:{}", cronTask.getId());
}
} else if (event.getChangeType() == CronTaskChangeEvent.ChangeType.UPDATE) {
if (cronMap.containsKey(cronTask.getId())) {
cronMap.get(cronTask.getId()).cancel(true);
cronMap.remove(cronTask.getId());
log.info("本地定时任务停止成功,定时任务编号:{}", cronTask.getId());
}
addCronTask(cronTask);
}
}
@EventListener
@Async
public void onCronTask(CronTaskEvent event) {
long startTime = System.currentTimeMillis();
final CronTaskVo vo = processor.getList().stream().filter(bean -> bean.getId().equals(event.getTaskId())).findFirst().orElse(null);
if (ObjUtil.isNull(vo)) {
log.warn("未发现任务:" + event.getTaskId());
return;
}
CronTaskLog taskLog = new CronTaskLog();
taskLog.setId(Long.valueOf(IdUtils.nextDateId(CronTaskLog.class.getSimpleName(), 17)));
taskLog.setTaskId(vo.getId());
taskLog.setGroupId(vo.getGroup());
taskLog.setStartTime(new Date(startTime));
TaskLogImpl.logHolder.set(taskLog);
try {
Object[] param = null;
if (vo.getMethod().getParameterCount() > 0) {
EvaluationContext context = new StandardEvaluationContext(act);
context.setVariable("act", act);
context.setVariable("bean", vo.getBean());
param = new Object[vo.getMethod().getParameterCount()];
for (int i = 0; i < param.length; i++) {
param[i] = parser.parseExpression(event.getParamELs().get(i)).getValue(context, vo.getMethod().getParameterTypes()[i]);
}
}
taskLog.setParams(JsonUtils.toJsonString(param));
currentLogs.add(taskLog);
log.info("开始执行任务:" + vo.getName() + "\t参数" + taskLog.getParams());
SpringUtils.publishEvent(new CronTaskStartEvent(event.getCronId(), vo.getId(),vo.getGroup()));
vo.getMethod().invoke(vo.getBean(), param);
taskLog.setOk(true);
log.info("执行任务成功:" + vo.getName());
// 执行成功后,继续执行子任务
if (ObjUtil.isNotNull(event.getCronId() != null)) {
List<CronTask> list = MongoUtil.find(CronTask.class, MongoUtil.conditions().put("pid", event.getCronId()).put("enabled", true));
if (CollUtil.isNotEmpty(list)) {
list.forEach(cronTask -> {
SpringUtils.publishEvent(new CronTaskEvent(cronTask.getId(), cronTask.getTaskId(), cronTask.getParamELs()));
});
}
}
} catch (Exception ex) {
taskLog.setOk(false);
taskLog.setMsg(ex.getCause().getMessage());
if (StrUtil.isBlank(taskLog.getEx())) {
taskLog.setEx(ExceptionUtil.stacktraceToString(ex.getCause()));
} else {
taskLog.setEx(taskLog.getEx() + "\n" + ExceptionUtil.stacktraceToString(ex.getCause()));
}
log.warn("执行任务失败:" + vo.getName(), ex);
} finally {
TaskLogImpl.logHolder.remove();
}
taskLog.setEndTime(new Date());
SpringUtils.publishEvent(new CronTaskEndEvent(event.getCronId(), vo.getId(),vo.getGroup()));
currentLogs.remove(taskLog);
template.insert(taskLog);
}
}