From aa805062b1859a0805a6a3f0e71217e44749f29d Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 31 May 2020 13:54:06 +0800 Subject: [PATCH] [dev] WorkflowInstanceController --- .../kfcfans/oms/common/OmsConstant.java | 15 ++++ .../oms/common/WorkflowInstanceStatus.java | 1 - .../oms/common/model/PEWorkflowDAG.java | 5 ++ .../kfcfans/oms/common/utils/JsonUtils.java | 4 +- .../oms/common/utils/WorkflowDAGUtils.java | 33 +++++++++ .../core/model/WorkflowInstanceInfoDO.java | 5 ++ .../repository/InstanceInfoRepository.java | 1 + .../WorkflowInstanceInfoRepository.java | 7 ++ .../oms/server/service/ContainerService.java | 5 +- .../server/service/InstanceLogService.java | 15 ++-- .../workflow/WorkflowInstanceManager.java | 7 ++ .../workflow/WorkflowInstanceService.java | 73 +++++++++++++++++++ .../web/controller/ContainerController.java | 3 +- .../web/controller/InstanceController.java | 35 +++++---- .../WorkflowInstanceController.java | 60 +++++++++++++++ .../request/QueryWorkflowInstanceRequest.java | 24 ++++++ ...InstanceLogVO.java => InstanceInfoVO.java} | 4 +- .../web/response/WorkflowInstanceInfoVO.java | 59 +++++++++++++++ .../tracker/task/FrequentTaskTracker.java | 10 +-- 19 files changed, 325 insertions(+), 41 deletions(-) create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OmsConstant.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowInstanceController.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryWorkflowInstanceRequest.java rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/{InstanceLogVO.java => InstanceInfoVO.java} (93%) create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInstanceInfoVO.java diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OmsConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OmsConstant.java new file mode 100644 index 00000000..904c4352 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OmsConstant.java @@ -0,0 +1,15 @@ +package com.github.kfcfans.oms.common; + +/** + * 公共常量 + * + * @author tjq + * @since 2020/5/31 + */ +public class OmsConstant { + + public static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + public static final String TIME_PATTERN_PLUS = "yyyy-MM-dd HH:mm:ss.SSS"; + + public static final String NONE = "N/A"; +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/WorkflowInstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/WorkflowInstanceStatus.java index a4aabf61..34b93678 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/WorkflowInstanceStatus.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/WorkflowInstanceStatus.java @@ -14,7 +14,6 @@ import java.util.List; */ @Getter @AllArgsConstructor - public enum WorkflowInstanceStatus { WAITING(1, "等待调度"), diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java index 894de996..913989e4 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java @@ -29,6 +29,11 @@ public class PEWorkflowDAG { public static class Node { private Long jobId; private String jobName; + + // 仅向前端输出时需要 + private Long instanceId; + private boolean finished; + private String result; } // 边 jobId -> jobId diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/JsonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/JsonUtils.java index 0d9ad291..f8a2f728 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/JsonUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/JsonUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.common.utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.kfcfans.oms.common.OmsException; import org.apache.commons.lang3.exception.ExceptionUtils; /** @@ -52,7 +53,6 @@ public class JsonUtils { }catch (Exception e) { ExceptionUtils.rethrow(e); } - // impossible - return null; + throw new OmsException("impossible"); } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java index 7e29d5b9..c1ef4ac9 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/WorkflowDAGUtils.java @@ -5,9 +5,12 @@ import com.github.kfcfans.oms.common.model.PEWorkflowDAG; import com.github.kfcfans.oms.common.model.WorkflowDAG; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Queues; import com.google.common.collect.Sets; +import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; /** @@ -64,6 +67,36 @@ public class WorkflowDAGUtils { return new WorkflowDAG(id2Node.get(rootIds.iterator().next())); } + /** + * 将引用式DAG图转化为点线式DAG图 + * @param dag 引用式DAG图 + * @return 点线式DAG图 + */ + public static PEWorkflowDAG convert2PE(WorkflowDAG dag) { + + List nodes = Lists.newLinkedList(); + List edges = Lists.newLinkedList(); + + Queue queue = Queues.newLinkedBlockingQueue(); + queue.add(dag.getRoot()); + + while (!queue.isEmpty()) { + WorkflowDAG.Node node = queue.poll(); + queue.addAll(node.getSuccessors()); + + // 添加点 + PEWorkflowDAG.Node peNode = new PEWorkflowDAG.Node(node.getJobId(), node.getJobName(), node.getInstanceId(), node.isFinished(), node.getResult()); + nodes.add(peNode); + + // 添加线 + node.getSuccessors().forEach(successor -> { + PEWorkflowDAG.Edge edge = new PEWorkflowDAG.Edge(node.getJobId(), successor.getJobId()); + edges.add(edge); + }); + } + return new PEWorkflowDAG(nodes, edges); + } + /** * 校验 DAG 是否有效 * @param peWorkflowDAG 点线表示法的 DAG 图 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInstanceInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInstanceInfoDO.java index 4f671ec2..39c77fae 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInstanceInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/WorkflowInstanceInfoDO.java @@ -41,6 +41,11 @@ public class WorkflowInstanceInfoDO { @Column(columnDefinition="TEXT") private String result; + // 实际触发时间 + private Long actualTriggerTime; + // 结束时间 + private Long finishedTime; + private Date gmtCreate; private Date gmtModified; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/InstanceInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/InstanceInfoRepository.java index 038ee3eb..a2bb43ff 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/InstanceInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/InstanceInfoRepository.java @@ -62,6 +62,7 @@ public interface InstanceInfoRepository extends JpaRepository findByAppIdAndType(long appId, int type, Pageable pageable); Page findByJobIdAndType(long jobId, int type, Pageable pageable); // 只会有一条数据,只是为了统一 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInstanceInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInstanceInfoRepository.java index 638a902d..546da313 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInstanceInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInstanceInfoRepository.java @@ -1,6 +1,8 @@ package com.github.kfcfans.oms.server.persistence.core.repository; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; @@ -28,4 +30,9 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository status); + + // list 三兄弟 + Page findByAppId(Long appId, Pageable pageable); + Page findByAppIdAndWfInstanceId(Long appId, Long wfInstanceId, Pageable pageable); + Page findByAppIdAndWorkflowId(Long appId, Long workflowId, Pageable pageable); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java index 8d337a9b..082c24af 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.server.service; import akka.actor.ActorSelection; +import com.github.kfcfans.oms.common.OmsConstant; import com.github.kfcfans.oms.common.model.DeployedContainerInfo; import com.github.kfcfans.oms.common.model.GitRepoInfo; import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest; @@ -69,8 +70,6 @@ public class ContainerService { private static final int DEPLOY_BATCH_NUM = 50; // 部署间隔 private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000; - // 时间格式 - private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; /** * 保存容器 @@ -213,7 +212,7 @@ public class ContainerService { Date lastDeployTime = container.getLastDeployTime(); if (lastDeployTime != null) { if ((System.currentTimeMillis() - lastDeployTime.getTime()) < DEPLOY_MIN_INTERVAL) { - remote.sendText("SYSTEM: [warn] deploy too frequent, last deploy time is: " + DateFormatUtils.format(lastDeployTime, TIME_PATTERN)); + remote.sendText("SYSTEM: [warn] deploy too frequent, last deploy time is: " + DateFormatUtils.format(lastDeployTime, OmsConstant.TIME_PATTERN)); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index 44283e03..ee8a57f9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.server.service; +import com.github.kfcfans.oms.common.OmsConstant; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.model.InstanceLogContent; import com.github.kfcfans.oms.common.utils.CommonUtils; @@ -56,7 +57,7 @@ public class InstanceLogService { private final ExecutorService workerPool; // 格式化时间戳 - private static final FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS"); + private static final FastDateFormat dateFormat = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS); // 每一个展示的行数 private static final int MAX_LINE_COUNT = 100; // 过期时间 @@ -118,7 +119,7 @@ public class InstanceLogService { ++lines; } }catch (Exception e) { - log.warn("[InstanceLogService] read logFile from disk failed.", e); + log.warn("[InstanceLog-{}] read logFile from disk failed.", instanceId, e); return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e)); } @@ -128,7 +129,7 @@ public class InstanceLogService { }catch (TimeoutException te) { return StringPage.simple("log file is being prepared, please try again later."); }catch (Exception e) { - log.warn("[InstanceLogService] fetchInstanceLog failed for instance(instanceId={}).", instanceId, e); + log.warn("[InstanceLog-{}] fetch instance log failed.", instanceId, e); return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e)); } } @@ -180,20 +181,20 @@ public class InstanceLogService { if (gridFsManager.available()) { try { gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId)); - log.info("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB succeed, using: {}.", instanceId, sw.stop()); + log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop()); }catch (Exception e) { - log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e); + log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e); } } }catch (Exception e) { - log.warn("[InstanceLogService] sync local instanceLogs(instanceId={}) failed.", instanceId, e); + log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e); } // 删除本地数据库数据 try { CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); instanceId2LastReportTime.remove(instanceId); }catch (Exception e) { - log.warn("[InstanceLogService] delete local instanceLog(instanceId={}) failed.", instanceId, e); + log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java index 60b38304..d33c05e2 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java @@ -58,6 +58,7 @@ public class WorkflowInstanceManager { newWfInstance.setAppId(wfInfo.getAppId()); newWfInstance.setWfInstanceId(wfInstanceId); newWfInstance.setWorkflowId(wfInfo.getId()); + newWfInstance.setActualTriggerTime(System.currentTimeMillis()); newWfInstance.setGmtCreate(now); newWfInstance.setGmtModified(now); @@ -73,6 +74,7 @@ public class WorkflowInstanceManager { newWfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); newWfInstance.setResult(e.getMessage()); + newWfInstance.setFinishedTime(System.currentTimeMillis()); } workflowInstanceInfoRepository.save(newWfInstance); return wfInstanceId; @@ -103,6 +105,7 @@ public class WorkflowInstanceManager { if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) { wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstanceInfo.setResult(String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, instanceConcurrency, wfInfo.getMaxWfInstanceNum())); + wfInstanceInfo.setFinishedTime(System.currentTimeMillis()); workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); return; @@ -130,6 +133,7 @@ public class WorkflowInstanceManager { wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstanceInfo.setResult(e.getMessage()); + wfInstanceInfo.setFinishedTime(System.currentTimeMillis()); log.error("[Workflow-{}] submit workflow: {} failed.", wfInfo.getId(), wfInfo, e); @@ -185,6 +189,7 @@ public class WorkflowInstanceManager { wfInstance.setDag(JsonUtils.toJSONStringUnsafe(dag)); wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_FAILED); + wfInstance.setFinishedTime(System.currentTimeMillis()); workflowInstanceInfoRepository.saveAndFlush(wfInstance); log.warn("[Workflow-{}] workflow(wfInstanceId={}) process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); @@ -229,6 +234,7 @@ public class WorkflowInstanceManager { wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); // 最终任务的结果作为整个 workflow 的结果 wfInstance.setResult(result); + wfInstance.setFinishedTime(System.currentTimeMillis()); log.info("[Workflow-{}] workflowInstance(wfInstanceId={}) process successfully.", wfId, wfInstanceId); } @@ -241,6 +247,7 @@ public class WorkflowInstanceManager { }catch (Exception e) { wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstance.setResult("MOVE NEXT STEP FAILED: " + e.getMessage()); + wfInstance.setFinishedTime(System.currentTimeMillis()); workflowInstanceInfoRepository.saveAndFlush(wfInstance); log.error("[Workflow-{}] update failed for workflowInstance({}).", wfId, wfInstanceId, e); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java new file mode 100644 index 00000000..02ac27b5 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java @@ -0,0 +1,73 @@ +package com.github.kfcfans.oms.server.service.workflow; + +import com.github.kfcfans.oms.common.OmsException; +import com.github.kfcfans.oms.common.SystemInstanceResult; +import com.github.kfcfans.oms.common.WorkflowInstanceStatus; +import com.github.kfcfans.oms.common.model.WorkflowDAG; +import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; +import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository; +import com.github.kfcfans.oms.server.service.instance.InstanceService; +import com.google.common.collect.Queues; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.Objects; +import java.util.Queue; + +/** + * 工作流实例服务 + * + * @author tjq + * @since 2020/5/31 + */ +@Slf4j +@Service +public class WorkflowInstanceService { + + @Resource + private InstanceService instanceService; + @Resource + private WorkflowInstanceInfoRepository wfInstanceInfoRepository; + + + /** + * 停止工作流实例 + * @param wfInstanceId 工作流实例ID + * @param appId 所属应用ID + */ + public void stopWorkflowInstance(Long wfInstanceId, Long appId) { + WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId)); + if (!Objects.equals(appId, wfInstance.getAppId())) { + throw new OmsException("Permission Denied!"); + } + if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { + throw new OmsException("already stopped"); + } + + // 修改数据库状态 + wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); + wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER); + wfInstance.setGmtModified(new Date()); + wfInstanceInfoRepository.saveAndFlush(wfInstance); + + // 停止所有已启动且未完成的服务 + WorkflowDAG workflowDAG = JsonUtils.parseObjectUnsafe(wfInstance.getDag(), WorkflowDAG.class); + Queue queue = Queues.newLinkedBlockingQueue(); + queue.add(workflowDAG.getRoot()); + while (!queue.isEmpty()) { + WorkflowDAG.Node node = queue.poll(); + + if (node.getInstanceId() != null && !node.isFinished()) { + log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); + instanceService.stopInstance(node.getInstanceId()); + } + queue.addAll(node.getSuccessors()); + } + + log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId); + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java index c04f52aa..cfd15d55 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ContainerController.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.server.web.controller; +import com.github.kfcfans.oms.common.OmsConstant; import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.common.constans.ContainerSourceType; @@ -119,7 +120,7 @@ public class ContainerController { if (containerInfoDO.getLastDeployTime() == null) { vo.setLastDeployTime("N/A"); }else { - vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), "yyyy-MM-dd HH:mm:ss")); + vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), OmsConstant.TIME_PATTERN)); } ContainerStatus status = ContainerStatus.of(containerInfoDO.getStatus()); vo.setStatus(status.name()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index 10e3fd79..3b6b014a 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.server.web.controller; import com.github.kfcfans.oms.common.InstanceStatus; +import com.github.kfcfans.oms.common.OmsConstant; import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.common.model.InstanceDetail; import com.github.kfcfans.oms.server.akka.OhMyServer; @@ -15,7 +16,7 @@ import com.github.kfcfans.oms.server.service.CacheService; import com.github.kfcfans.oms.server.service.InstanceLogService; import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest; -import com.github.kfcfans.oms.server.web.response.InstanceLogVO; +import com.github.kfcfans.oms.server.web.response.InstanceInfoVO; import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Value; @@ -57,8 +58,6 @@ public class InstanceController { @Resource private InstanceInfoRepository instanceInfoRepository; - private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; - @GetMapping("/stop") public ResultDTO stopInstance(Long instanceId) { instanceService.stopInstance(instanceId); @@ -106,7 +105,7 @@ public class InstanceController { } @PostMapping("/list") - public ResultDTO> list(@RequestBody QueryInstanceRequest request) { + public ResultDTO> list(@RequestBody QueryInstanceRequest request) { Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified"); PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort); @@ -125,36 +124,36 @@ public class InstanceController { return ResultDTO.success(convertPage(instanceInfoRepository.findByInstanceIdAndType(request.getInstanceId(), request.getType().getV(), pageable))); } - private PageResult convertPage(Page page) { - List content = page.getContent().stream().map(instanceLogDO -> { - InstanceLogVO instanceLogVO = new InstanceLogVO(); - BeanUtils.copyProperties(instanceLogDO, instanceLogVO); + private PageResult convertPage(Page page) { + List content = page.getContent().stream().map(instanceLogDO -> { + InstanceInfoVO instanceInfoVO = new InstanceInfoVO(); + BeanUtils.copyProperties(instanceLogDO, instanceInfoVO); // 状态转化为中文 - instanceLogVO.setStatusStr(InstanceStatus.of(instanceLogDO.getStatus()).getDes()); + instanceInfoVO.setStatusStr(InstanceStatus.of(instanceLogDO.getStatus()).getDes()); // 额外设置任务名称,提高可读性 - instanceLogVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId())); + instanceInfoVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId())); // ID 转化为 String(JS精度丢失) - instanceLogVO.setJobId(instanceLogDO.getJobId().toString()); - instanceLogVO.setInstanceId(instanceLogDO.getInstanceId().toString()); + instanceInfoVO.setJobId(instanceLogDO.getJobId().toString()); + instanceInfoVO.setInstanceId(instanceLogDO.getInstanceId().toString()); // 格式化时间 if (instanceLogDO.getActualTriggerTime() == null) { - instanceLogVO.setActualTriggerTime("N/A"); + instanceInfoVO.setActualTriggerTime(OmsConstant.NONE); }else { - instanceLogVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), TIME_PATTERN)); + instanceInfoVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN)); } if (instanceLogDO.getFinishedTime() == null) { - instanceLogVO.setFinishedTime("N/A"); + instanceInfoVO.setFinishedTime(OmsConstant.NONE); }else { - instanceLogVO.setFinishedTime(DateFormatUtils.format(instanceLogDO.getFinishedTime(), TIME_PATTERN)); + instanceInfoVO.setFinishedTime(DateFormatUtils.format(instanceLogDO.getFinishedTime(), OmsConstant.TIME_PATTERN)); } - return instanceLogVO; + return instanceInfoVO; }).collect(Collectors.toList()); - PageResult pageResult = new PageResult<>(page); + PageResult pageResult = new PageResult<>(page); pageResult.setData(content); return pageResult; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowInstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowInstanceController.java new file mode 100644 index 00000000..4a0fa209 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowInstanceController.java @@ -0,0 +1,60 @@ +package com.github.kfcfans.oms.server.web.controller; + +import com.github.kfcfans.oms.common.response.ResultDTO; +import com.github.kfcfans.oms.server.persistence.PageResult; +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; +import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository; +import com.github.kfcfans.oms.server.service.workflow.WorkflowInstanceService; +import com.github.kfcfans.oms.server.web.request.QueryWorkflowInstanceRequest; +import com.github.kfcfans.oms.server.web.response.WorkflowInstanceInfoVO; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; +import java.util.stream.Collectors; + +/** + * 工作流实例控制器 + * + * @author tjq + * @since 2020/5/31 + */ +@RestController +@RequestMapping("/wfInstance") +public class WorkflowInstanceController { + + @Resource + private WorkflowInstanceService workflowInstanceService; + @Resource + private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + + @GetMapping("/stop") + public ResultDTO stopWfInstance(Long wfInstanceId, Long appId) { + workflowInstanceService.stopWorkflowInstance(wfInstanceId, appId); + return ResultDTO.success(null); + } + + @PostMapping("/list") + public ResultDTO> listWfInstance(@RequestBody QueryWorkflowInstanceRequest req) { + Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified"); + PageRequest pageable = PageRequest.of(req.getIndex(), req.getPageSize(), sort); + + Page ps; + if (req.getWfInstanceId() == null && req.getWorkflowId() == null) { + ps = workflowInstanceInfoRepository.findByAppId(req.getAppId(), pageable); + }else if (req.getWfInstanceId() != null) { + ps = workflowInstanceInfoRepository.findByAppIdAndWfInstanceId(req.getAppId(), req.getWfInstanceId(), pageable); + }else { + ps = workflowInstanceInfoRepository.findByAppIdAndWorkflowId(req.getAppId(), req.getWorkflowId(), pageable); + } + return ResultDTO.success(convertPage(ps)); + } + + private static PageResult convertPage(Page ps) { + PageResult pr = new PageResult<>(ps); + pr.setData(ps.getContent().stream().map(WorkflowInstanceInfoVO::from).collect(Collectors.toList())); + return pr; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryWorkflowInstanceRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryWorkflowInstanceRequest.java new file mode 100644 index 00000000..7dea3356 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryWorkflowInstanceRequest.java @@ -0,0 +1,24 @@ +package com.github.kfcfans.oms.server.web.request; + +import lombok.Data; + +/** + * 查询工作流实例请求 + * + * @author tjq + * @since 2020/5/31 + */ +@Data +public class QueryWorkflowInstanceRequest { + + // 任务所属应用ID + private Long appId; + // 当前页码 + private Integer index; + // 页大小 + private Integer pageSize; + + // 查询条件(NORMAL/WORKFLOW) + private Long wfInstanceId; + private Long workflowId; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceInfoVO.java similarity index 93% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceInfoVO.java index 10ef752c..7272d12b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceInfoVO.java @@ -3,13 +3,13 @@ package com.github.kfcfans.oms.server.web.response; import lombok.Data; /** - * ExecuteLog 对外展示对象 + * InstanceInfo 对外展示对象 * * @author tjq * @since 2020/4/12 */ @Data -public class InstanceLogVO { +public class InstanceInfoVO { // 任务ID(JS精度丢失) private String jobId; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInstanceInfoVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInstanceInfoVO.java new file mode 100644 index 00000000..07c66d8a --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInstanceInfoVO.java @@ -0,0 +1,59 @@ +package com.github.kfcfans.oms.server.web.response; + +import com.github.kfcfans.oms.common.OmsConstant; +import com.github.kfcfans.oms.common.WorkflowInstanceStatus; +import com.github.kfcfans.oms.common.model.PEWorkflowDAG; +import com.github.kfcfans.oms.common.model.WorkflowDAG; +import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; +import lombok.Data; +import org.apache.commons.lang3.time.DateFormatUtils; +import org.springframework.beans.BeanUtils; + +/** + * 工作流实例视图层展示对象 + * + * @author tjq + * @since 2020/5/31 + */ +@Data +public class WorkflowInstanceInfoVO { + + // 任务所属应用的ID,冗余提高查询效率 + private Long appId; + + // workflowInstanceId(任务实例表都使用单独的ID作为主键以支持潜在的分表需求) + private Long wfInstanceId; + + private Long workflowId; + + // workflow 状态(WorkflowInstanceStatus) + private String status; + + private PEWorkflowDAG pEWorkflowDAG; + private String result; + + // 实际触发时间(需要格式化为人看得懂的时间) + private String actualTriggerTime; + // 结束时间(同理,需要格式化) + private String finishedTime; + + public static WorkflowInstanceInfoVO from(WorkflowInstanceInfoDO wfInstanceDO) { + WorkflowInstanceInfoVO vo = new WorkflowInstanceInfoVO(); + BeanUtils.copyProperties(wfInstanceDO, vo); + + vo.setStatus(WorkflowInstanceStatus.of(wfInstanceDO.getStatus()).getDes()); + vo.setPEWorkflowDAG(WorkflowDAGUtils.convert2PE(JsonUtils.parseObjectUnsafe(wfInstanceDO.getDag(), WorkflowDAG.class))); + + // 格式化时间 + vo.setActualTriggerTime(DateFormatUtils.format(wfInstanceDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN)); + if (wfInstanceDO.getFinishedTime() == null) { + vo.setFinishedTime(OmsConstant.NONE); + }else { + vo.setFinishedTime(DateFormatUtils.format(wfInstanceDO.getFinishedTime(), OmsConstant.TIME_PATTERN)); + } + + return vo; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index 9f93a217..8b9d7f6d 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -1,10 +1,7 @@ package com.github.kfcfans.oms.worker.core.tracker.task; import akka.actor.ActorSelection; -import com.github.kfcfans.oms.common.ExecuteType; -import com.github.kfcfans.oms.common.InstanceStatus; -import com.github.kfcfans.oms.common.RemoteConstant; -import com.github.kfcfans.oms.common.TimeExpressionType; +import com.github.kfcfans.oms.common.*; import com.github.kfcfans.oms.common.model.InstanceDetail; import com.github.kfcfans.oms.common.request.ServerScheduleJobReq; import com.github.kfcfans.oms.common.request.TaskTrackerReportInstanceStatusReq; @@ -64,7 +61,6 @@ public class FrequentTaskTracker extends TaskTracker { private static final int HISTORY_SIZE = 10; private static final String LAST_TASK_ID_PREFIX = "L"; - private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; protected FrequentTaskTracker(ServerScheduleJobReq req) { super(req); @@ -121,9 +117,9 @@ public class FrequentTaskTracker extends TaskTracker { subDetail.setSubInstanceId(subId); // 设置时间 - subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), TIME_PATTERN)); + subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), OmsConstant.TIME_PATTERN)); if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) { - subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), TIME_PATTERN)); + subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), OmsConstant.TIME_PATTERN)); }else { subDetail.setFinishedTime("N/A"); }