fix: workflow schedule in advance #108

This commit is contained in:
tjq 2020-11-26 00:08:39 +08:00
parent 63cc97024a
commit 5f7de7231c
8 changed files with 14 additions and 5 deletions

View File

@ -27,6 +27,8 @@ public class WorkflowInstanceInfoDTO {
private String dag;
private String result;
// 预计触发时间
private Long expectedTriggerTime;
// 实际触发时间
private Long actualTriggerTime;
// 结束时间

View File

@ -48,6 +48,8 @@ public class WorkflowInstanceInfoDO {
@Column
private String result;
// 预计触发时间
private Long expectedTriggerTime;
// 实际触发时间
private Long actualTriggerTime;
// 结束时间

View File

@ -30,5 +30,5 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository<WorkflowIn
int countByWorkflowIdAndStatusIn(Long workflowId, List<Integer> status);
// 状态检查
List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> appIds, int status, Date before);
List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> appIds, int status, long time);
}

View File

@ -162,7 +162,7 @@ public class InstanceStatusCheckService {
// 重试长时间处于 WAITING 状态的工作流实例
long threshold = System.currentTimeMillis() - WORKFLOW_WAITING_TIMEOUT_MS;
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
List<WorkflowInstanceInfoDO> waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, WorkflowInstanceStatus.WAITING.getV(), new Date(threshold));
List<WorkflowInstanceInfoDO> waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWfInstanceList)) {
List<Long> wfInstanceIds = waitingWfInstanceList.stream().map(WorkflowInstanceInfoDO::getWfInstanceId).collect(Collectors.toList());

View File

@ -193,7 +193,7 @@ public class OmsScheduleService {
wfInfos.forEach(wfInfo -> {
// 1. 先生成调度记录防止不调度的情况发生
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime());
// 2. 推入时间轮准备调度执行
long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis();

View File

@ -68,9 +68,10 @@ public class WorkflowInstanceManager {
* 创建工作流任务实例
* @param wfInfo 工作流任务元数据描述信息
* @param initParams 启动参数
* @param expectTriggerTime 预计执行时间
* @return wfInstanceId
*/
public Long create(WorkflowInfoDO wfInfo, String initParams) {
public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime) {
Long wfId = wfInfo.getId();
Long wfInstanceId = idGenerateService.allocate();
@ -82,6 +83,7 @@ public class WorkflowInstanceManager {
newWfInstance.setWfInstanceId(wfInstanceId);
newWfInstance.setWorkflowId(wfId);
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
newWfInstance.setExpectedTriggerTime(expectTriggerTime);
newWfInstance.setActualTriggerTime(System.currentTimeMillis());
newWfInstance.setWfInitParams(initParams);

View File

@ -168,7 +168,7 @@ public class WorkflowService {
private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) {
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay);
if (delay <= 0) {
workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
}else {

View File

@ -32,6 +32,8 @@ public class WorkflowInstanceInfoVO {
private PEWorkflowDAG pEWorkflowDAG;
private String result;
// 预计触发时间
private String expectedTriggerTime;
// 实际触发时间需要格式化为人看得懂的时间
private String actualTriggerTime;
// 结束时间同理需要格式化
@ -49,6 +51,7 @@ public class WorkflowInstanceInfoVO {
vo.setWorkflowId(String.valueOf(wfInstanceDO.getWorkflowId()));
// 格式化时间
vo.setExpectedTriggerTime(DateFormatUtils.format(wfInstanceDO.getExpectedTriggerTime(), OmsConstant.TIME_PATTERN));
vo.setActualTriggerTime(DateFormatUtils.format(wfInstanceDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN));
if (wfInstanceDO.getFinishedTime() == null) {
vo.setFinishedTime(OmsConstant.NONE);