diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 9b8a874c..9ae00a1c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.service; import com.github.kfcfans.powerjob.common.InstanceStatus; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; @@ -78,7 +79,7 @@ public class JobService { jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds())); } - refreshJob(jobInfoDO); + calculateNextTriggerTime(jobInfoDO); if (request.getId() == null) { jobInfoDO.setGmtCreate(new Date()); } @@ -143,7 +144,7 @@ public class JobService { JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); - refreshJob(jobInfoDO); + calculateNextTriggerTime(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO); } @@ -184,7 +185,7 @@ public class JobService { }); } - private void refreshJob(JobInfoDO jobInfoDO) throws Exception { + private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception { // 计算下次调度时间 Date now = new Date(); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); @@ -192,6 +193,9 @@ public class JobService { if (timeExpressionType == TimeExpressionType.CRON) { CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); Date nextValidTime = cronExpression.getNextValidTimeAfter(now); + if (nextValidTime == null) { + throw new PowerJobException("invalid cron expression: " + jobInfoDO.getTimeExpression()); + } jobInfoDO.setNextTriggerTime(nextValidTime.getTime()); }else if (timeExpressionType == TimeExpressionType.API || timeExpressionType == TimeExpressionType.WORKFLOW) { jobInfoDO.setTimeExpression(null); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 3da63e5d..2115a664 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -121,8 +121,6 @@ public class OmsScheduleService { */ private void scheduleCronJob(List appIds) { - - Date now = new Date(); long nowTime = System.currentTimeMillis(); long timeThreshold = nowTime + 2 * SCHEDULE_RATE; Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> { @@ -165,24 +163,13 @@ public class OmsScheduleService { }); // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms) - List updatedJobInfos = Lists.newLinkedList(); jobInfos.forEach(jobInfoDO -> { - try { - - Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression()); - - JobInfoDO updatedJobInfo = new JobInfoDO(); - BeanUtils.copyProperties(jobInfoDO, updatedJobInfo); - updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); - updatedJobInfo.setGmtModified(now); - - updatedJobInfos.add(updatedJobInfo); + refreshJob(jobInfoDO); } catch (Exception e) { - log.error("[Job-{}] calculate next trigger time failed.", jobInfoDO.getId(), e); + log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e); } }); - jobInfoRepository.saveAll(updatedJobInfos); jobInfoRepository.flush(); @@ -203,7 +190,6 @@ public class OmsScheduleService { return; } - Date now = new Date(); wfInfos.forEach(wfInfo -> { // 1. 先生成调度记录,防止不调度的情况发生 @@ -219,16 +205,9 @@ public class OmsScheduleService { // 3. 重新计算下一次调度时间并更新 try { - Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); - - WorkflowInfoDO updateEntity = new WorkflowInfoDO(); - BeanUtils.copyProperties(wfInfo, updateEntity); - - updateEntity.setNextTriggerTime(nextTriggerTime.getTime()); - updateEntity.setGmtModified(now); - workflowInfoRepository.save(updateEntity); + refreshWorkflow(wfInfo); }catch (Exception e) { - log.error("[Workflow-{}] parse cron failed.", wfInfo.getId(), e); + log.error("[Workflow-{}] refresh workflow failed.", wfInfo.getId(), e); } }); workflowInfoRepository.flush(); @@ -264,6 +243,40 @@ public class OmsScheduleService { }); } + private void refreshJob(JobInfoDO jobInfo) throws Exception { + Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression()); + + JobInfoDO updatedJobInfo = new JobInfoDO(); + BeanUtils.copyProperties(jobInfo, updatedJobInfo); + + if (nextTriggerTime == null) { + log.warn("[Job-{}] this job won't be scheduled anymore, system will set the status to DISABLE!", jobInfo.getId()); + updatedJobInfo.setStatus(SwitchableStatus.DISABLE.getV()); + }else { + updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime()); + } + updatedJobInfo.setGmtModified(new Date()); + + jobInfoRepository.save(updatedJobInfo); + } + + private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception { + Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); + + WorkflowInfoDO updateEntity = new WorkflowInfoDO(); + BeanUtils.copyProperties(wfInfo, updateEntity); + + if (nextTriggerTime == null) { + log.warn("[Workflow-{}] this workflow won't be scheduled anymore, system will set the status to DISABLE!", wfInfo.getId()); + wfInfo.setStatus(SwitchableStatus.DISABLE.getV()); + }else { + updateEntity.setNextTriggerTime(nextTriggerTime.getTime()); + } + + updateEntity.setGmtModified(new Date()); + workflowInfoRepository.save(updateEntity); + } + /** * 计算下次触发时间 * @param preTriggerTime 前一次触发时间