fix: fixed-cron-expression schedule #64

This commit is contained in:
tjq 2020-10-08 11:19:24 +08:00
parent a138e05404
commit fb61ad0bc6
2 changed files with 45 additions and 28 deletions

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.service; package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO; import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
@ -78,7 +79,7 @@ public class JobService {
jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds())); jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds()));
} }
refreshJob(jobInfoDO); calculateNextTriggerTime(jobInfoDO);
if (request.getId() == null) { if (request.getId() == null) {
jobInfoDO.setGmtCreate(new Date()); jobInfoDO.setGmtCreate(new Date());
} }
@ -143,7 +144,7 @@ public class JobService {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
refreshJob(jobInfoDO); calculateNextTriggerTime(jobInfoDO);
jobInfoRepository.saveAndFlush(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO);
} }
@ -184,7 +185,7 @@ public class JobService {
}); });
} }
private void refreshJob(JobInfoDO jobInfoDO) throws Exception { private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception {
// 计算下次调度时间 // 计算下次调度时间
Date now = new Date(); Date now = new Date();
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
@ -192,6 +193,9 @@ public class JobService {
if (timeExpressionType == TimeExpressionType.CRON) { if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(now); Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
if (nextValidTime == null) {
throw new PowerJobException("invalid cron expression: " + jobInfoDO.getTimeExpression());
}
jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
}else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { }else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) {
jobInfoDO.setTimeExpression(null); jobInfoDO.setTimeExpression(null);

View File

@ -121,8 +121,6 @@ public class OmsScheduleService {
*/ */
private void scheduleCronJob(List<Long> appIds) { private void scheduleCronJob(List<Long> appIds) {
Date now = new Date();
long nowTime = System.currentTimeMillis(); long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE; long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> { Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
@ -165,24 +163,13 @@ public class OmsScheduleService {
}); });
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms // 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms
List<JobInfoDO> updatedJobInfos = Lists.newLinkedList();
jobInfos.forEach(jobInfoDO -> { jobInfos.forEach(jobInfoDO -> {
try { try {
refreshJob(jobInfoDO);
Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression());
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
updatedJobInfo.setGmtModified(now);
updatedJobInfos.add(updatedJobInfo);
} catch (Exception e) { } catch (Exception e) {
log.error("[Job-{}] calculate next trigger time failed.", jobInfoDO.getId(), e); log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);
} }
}); });
jobInfoRepository.saveAll(updatedJobInfos);
jobInfoRepository.flush(); jobInfoRepository.flush();
@ -203,7 +190,6 @@ public class OmsScheduleService {
return; return;
} }
Date now = new Date();
wfInfos.forEach(wfInfo -> { wfInfos.forEach(wfInfo -> {
// 1. 先生成调度记录防止不调度的情况发生 // 1. 先生成调度记录防止不调度的情况发生
@ -219,16 +205,9 @@ public class OmsScheduleService {
// 3. 重新计算下一次调度时间并更新 // 3. 重新计算下一次调度时间并更新
try { try {
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); refreshWorkflow(wfInfo);
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity);
updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
updateEntity.setGmtModified(now);
workflowInfoRepository.save(updateEntity);
}catch (Exception e) { }catch (Exception e) {
log.error("[Workflow-{}] parse cron failed.", wfInfo.getId(), e); log.error("[Workflow-{}] refresh workflow failed.", wfInfo.getId(), e);
} }
}); });
workflowInfoRepository.flush(); workflowInfoRepository.flush();
@ -264,6 +243,40 @@ public class OmsScheduleService {
}); });
} }
private void refreshJob(JobInfoDO jobInfo) throws Exception {
Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression());
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfo, updatedJobInfo);
if (nextTriggerTime == null) {
log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId());
updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV());
}else {
updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
}
updatedJobInfo.setGmtModified(new Date());
jobInfoRepository.save(updatedJobInfo);
}
private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception {
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity);
if (nextTriggerTime == null) {
log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId());
wfInfo.setStatus(SwitchableStatus.DISABLE.getV());
}else {
updateEntity.setNextTriggerTime(nextTriggerTime.getTime());
}
updateEntity.setGmtModified(new Date());
workflowInfoRepository.save(updateEntity);
}
/** /**
* 计算下次触发时间 * 计算下次触发时间
* @param preTriggerTime 前一次触发时间 * @param preTriggerTime 前一次触发时间