diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java index c9a22df4..b35ea919 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java @@ -27,6 +27,8 @@ public class WorkflowInstanceInfoDTO { private String dag; private String result; + // 预计触发时间 + private Long expectedTriggerTime; // 实际触发时间 private Long actualTriggerTime; // 结束时间 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java index c2a074a4..e6f483c4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java @@ -48,6 +48,8 @@ public class WorkflowInstanceInfoDO { @Column private String result; + // 预计触发时间 + private Long expectedTriggerTime; // 实际触发时间 private Long actualTriggerTime; // 结束时间 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java index 809bfdfc..35d97455 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java @@ -30,5 +30,5 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository status); // 状态检查 - List findByAppIdInAndStatusAndGmtModifiedBefore(List appIds, int status, Date before); + List findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List appIds, int status, long time); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index 489f2874..ddea791e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -162,7 +162,7 @@ public class InstanceStatusCheckService { // 重试长时间处于 WAITING 状态的工作流实例 long threshold = System.currentTimeMillis() - WORKFLOW_WAITING_TIMEOUT_MS; Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> { - List waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, WorkflowInstanceStatus.WAITING.getV(), new Date(threshold)); + List waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold); if (!CollectionUtils.isEmpty(waitingWfInstanceList)) { List wfInstanceIds = waitingWfInstanceList.stream().map(WorkflowInstanceInfoDO::getWfInstanceId).collect(Collectors.toList()); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 2115a664..403b8673 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -193,7 +193,7 @@ public class OmsScheduleService { wfInfos.forEach(wfInfo -> { // 1. 先生成调度记录,防止不调度的情况发生 - Long wfInstanceId = workflowInstanceManager.create(wfInfo, null); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime()); // 2. 推入时间轮,准备调度执行 long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index 37ca9396..bbfc895b 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -68,9 +68,10 @@ public class WorkflowInstanceManager { * 创建工作流任务实例 * @param wfInfo 工作流任务元数据(描述信息) * @param initParams 启动参数 + * @param expectTriggerTime 预计执行时间 * @return wfInstanceId */ - public Long create(WorkflowInfoDO wfInfo, String initParams) { + public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime) { Long wfId = wfInfo.getId(); Long wfInstanceId = idGenerateService.allocate(); @@ -82,6 +83,7 @@ public class WorkflowInstanceManager { newWfInstance.setWfInstanceId(wfInstanceId); newWfInstance.setWorkflowId(wfId); newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); + newWfInstance.setExpectedTriggerTime(expectTriggerTime); newWfInstance.setActualTriggerTime(System.currentTimeMillis()); newWfInstance.setWfInitParams(initParams); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index 51929452..53ae028b 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -168,7 +168,7 @@ public class WorkflowService { private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) { log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay); - Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay); if (delay <= 0) { workflowInstanceManager.start(wfInfo, wfInstanceId, initParams); }else { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java index 20c55b06..ed1e2daa 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java @@ -32,6 +32,8 @@ public class WorkflowInstanceInfoVO { private PEWorkflowDAG pEWorkflowDAG; private String result; + // 预计触发时间 + private String expectedTriggerTime; // 实际触发时间(需要格式化为人看得懂的时间) private String actualTriggerTime; // 结束时间(同理,需要格式化) @@ -49,6 +51,7 @@ public class WorkflowInstanceInfoVO { vo.setWorkflowId(String.valueOf(wfInstanceDO.getWorkflowId())); // 格式化时间 + vo.setExpectedTriggerTime(DateFormatUtils.format(wfInstanceDO.getExpectedTriggerTime(), OmsConstant.TIME_PATTERN)); vo.setActualTriggerTime(DateFormatUtils.format(wfInstanceDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN)); if (wfInstanceDO.getFinishedTime() == null) { vo.setFinishedTime(OmsConstant.NONE);