From 05fc44e11fb52522aaa45e2c828efa80c01be9cc Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 2 Jun 2020 14:22:32 +0800 Subject: [PATCH] [dev] OhMyClient add workflow api --- .../github/kfcfans/oms/client/OhMyClient.java | 128 +++++++++++++++++- .../src/test/java/TestWorkflow.java | 72 ++++++++++ .../kfcfans/oms/common/OpenAPIConstant.java | 15 +- .../oms/common/TimeExpressionType.java | 3 +- .../oms/common/model/PEWorkflowDAG.java | 9 +- .../request/http/SaveJobInfoRequest.java | 2 +- .../request/http/SaveWorkflowRequest.java | 13 +- .../oms/common/response/WorkflowInfoDTO.java | 45 ++++++ .../response/WorkflowInstanceInfoDTO.java | 35 +++++ .../oms/server/service/CacheService.java | 8 +- .../oms/server/service/DispatchService.java | 8 +- .../oms/server/service/JobService.java | 6 +- .../workflow/WorkflowInstanceManager.java | 20 +-- .../workflow/WorkflowInstanceService.java | 22 ++- .../service/workflow/WorkflowService.java | 26 ++++ .../web/controller/OpenAPIController.java | 57 +++++++- .../src/main/resources/logback-dev.xml | 5 + 17 files changed, 433 insertions(+), 41 deletions(-) create mode 100644 oh-my-scheduler-client/src/test/java/TestWorkflow.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/response/WorkflowInfoDTO.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/response/WorkflowInstanceInfoDTO.java diff --git a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java index 1d061784..79dd9159 100644 --- a/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java +++ b/oh-my-scheduler-client/src/main/java/com/github/kfcfans/oms/client/OhMyClient.java @@ -4,9 +4,11 @@ import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.OpenAPIConstant; import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest; +import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.oms.common.response.InstanceInfoDTO; import com.github.kfcfans.oms.common.response.JobInfoDTO; import com.github.kfcfans.oms.common.response.ResultDTO; +import com.github.kfcfans.oms.common.response.WorkflowInfoDTO; import com.github.kfcfans.oms.common.utils.HttpUtils; import com.github.kfcfans.oms.common.utils.JsonUtils; import com.google.common.collect.Lists; @@ -57,7 +59,7 @@ public class OhMyClient { allAddress = addressList; for (String addr : addressList) { - String url = getUrl(addr, addr) + "?appName=" + appName; + String url = getUrl(OpenAPIConstant.ASSERT, addr) + "?appName=" + appName; try { String result = HttpUtils.get(url); if (StringUtils.isNotEmpty(result)) { @@ -186,7 +188,7 @@ public class OhMyClient { /** * 停止应用实例 * @param instanceId 应用实例ID - * @return true -> 停止成功,false -> 停止失败 + * @return true 停止成功,false 停止失败 * @throws Exception 异常 */ public ResultDTO stopInstance(Long instanceId) throws Exception { @@ -226,6 +228,128 @@ public class OhMyClient { return JsonUtils.parseObject(post, ResultDTO.class); } + /* ************* Workflow 区 ************* */ + /** + * 保存工作流(包括创建和修改) + * @param request 创建/修改 Workflow 请求 + * @return 工作流ID + * @throws Exception 异常 + */ + public ResultDTO saveWorkflow(SaveWorkflowRequest request) throws Exception { + request.setAppId(appId); + MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); + String json = JsonUtils.toJSONStringUnsafe(request); + String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(json, jsonType)); + return JsonUtils.parseObject(post, ResultDTO.class); + } + + /** + * 根据 workflowId 查询工作流信息 + * @param workflowId workflowId + * @return 工作流信息 + * @throws Exception 异常 + */ + public ResultDTO fetchWorkflow(Long workflowId) throws Exception { + RequestBody body = new FormBody.Builder() + .add("workflowId", workflowId.toString()) + .add("appId", appId.toString()) + .build(); + String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body); + return JsonUtils.parseObject(post, ResultDTO.class); + } + + /** + * 禁用某个工作流 + * @param workflowId 工作流ID + * @return 标准返回对象 + * @throws Exception 异常 + */ + public ResultDTO disableWorkflow(Long workflowId) throws Exception { + RequestBody body = new FormBody.Builder() + .add("workflowId", workflowId.toString()) + .add("appId", appId.toString()) + .build(); + String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body); + return JsonUtils.parseObject(post, ResultDTO.class); + } + + /** + * 启用某个工作流 + * @param workflowId workflowId + * @return 标准返回对象 + * @throws Exception 异常 + */ + public ResultDTO enableWorkflow(Long workflowId) throws Exception { + RequestBody body = new FormBody.Builder() + .add("workflowId", workflowId.toString()) + .add("appId", appId.toString()) + .build(); + String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body); + return JsonUtils.parseObject(post, ResultDTO.class); + } + + /** + * 删除某个工作流 + * @param workflowId workflowId + * @return 标准返回对象 + * @throws Exception 异常 + */ + public ResultDTO deleteWorkflow(Long workflowId) throws Exception { + RequestBody body = new FormBody.Builder() + .add("workflowId", workflowId.toString()) + .add("appId", appId.toString()) + .build(); + String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body); + return JsonUtils.parseObject(post, ResultDTO.class); + } + + /** + * 运行工作流 + * @param workflowId workflowId + * @return 工作流实例ID + * @throws Exception 异常 + */ + public ResultDTO runWorkflow(Long workflowId) throws Exception { + FormBody.Builder builder = new FormBody.Builder() + .add("workflowId", workflowId.toString()) + .add("appId", appId.toString()); + String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build()); + return JsonUtils.parseObject(post, ResultDTO.class); + } + + /* ************* Workflow Instance 区 ************* */ + /** + * 停止应用实例 + * @param wfInstanceId 工作流实例ID + * @return true 停止成功 ; false 停止失败 + * @throws Exception 异常 + */ + public ResultDTO stopWorkflowInstance(Long wfInstanceId) throws Exception { + RequestBody body = new FormBody.Builder() + .add("wfInstanceId", wfInstanceId.toString()) + .add("appId", appId.toString()) + .build(); + String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body); + return JsonUtils.parseObject(post, ResultDTO.class); + } + + /** + * 查询任务实例的信息 + * @param wfInstanceId 任务实例ID + * @return 任务实例信息 + * @throws Exception 潜在的异常 + */ + public ResultDTO fetchWorkflowInstanceInfo(Long wfInstanceId) throws Exception { + RequestBody body = new FormBody.Builder() + .add("wfInstanceId", wfInstanceId.toString()) + .add("appId", appId.toString()) + .build(); + String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body); + return JsonUtils.parseObject(post, ResultDTO.class); + } + + + private String postHA(String path, RequestBody requestBody) { // 先尝试默认地址 diff --git a/oh-my-scheduler-client/src/test/java/TestWorkflow.java b/oh-my-scheduler-client/src/test/java/TestWorkflow.java new file mode 100644 index 00000000..81bdb768 --- /dev/null +++ b/oh-my-scheduler-client/src/test/java/TestWorkflow.java @@ -0,0 +1,72 @@ +import com.github.kfcfans.oms.client.OhMyClient; +import com.github.kfcfans.oms.common.TimeExpressionType; +import com.github.kfcfans.oms.common.model.PEWorkflowDAG; +import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; +import com.google.common.collect.Lists; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.List; + +/** + * 测试 Client(workflow部分) + * + * @author tjq + * @since 2020/6/2 + */ +public class TestWorkflow { + + private static OhMyClient ohMyClient; + + @BeforeAll + public static void initClient() throws Exception { + ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test"); + } + + @Test + public void testSaveWorkflow() throws Exception { + + // DAG 图 + List nodes = Lists.newLinkedList(); + + nodes.add(new PEWorkflowDAG.Node(1L, "node-1", null, false, null)); + nodes.add(new PEWorkflowDAG.Node(2L, "node-2", null, false, null)); + + PEWorkflowDAG peWorkflowDAG = new PEWorkflowDAG(nodes, null); + SaveWorkflowRequest req = new SaveWorkflowRequest(); + + req.setWfName("workflow-by-client"); + req.setWfDescription("created by client"); + req.setPEWorkflowDAG(peWorkflowDAG); + req.setEnable(true); + req.setTimeExpressionType(TimeExpressionType.API); + + System.out.println(ohMyClient.saveWorkflow(req)); + } + + @Test + public void testDisableWorkflow() throws Exception { + System.out.println(ohMyClient.disableWorkflow(1L)); + } + + @Test + public void testDeleteWorkflow() throws Exception { + System.out.println(ohMyClient.deleteWorkflow(1L)); + } + + @Test + public void testEnableWorkflow() throws Exception { + System.out.println(ohMyClient.enableWorkflow(1L)); + } + + @Test + public void testFetchWorkflowInfo() throws Exception { + System.out.println(ohMyClient.fetchWorkflow(1L)); + } + + @Test + public void testRunWorkflow() throws Exception { + System.out.println(ohMyClient.runWorkflow(1L)); + } + +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OpenAPIConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OpenAPIConstant.java index a6c5624d..9ea18a7e 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OpenAPIConstant.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/OpenAPIConstant.java @@ -11,6 +11,8 @@ public class OpenAPIConstant { public static final String WEB_PATH = "/openApi"; public static final String ASSERT = "/assert"; + + /* ************* JOB 区 ************* */ public static final String SAVE_JOB = "/saveJob"; public static final String FETCH_JOB = "/fetchJob"; public static final String DISABLE_JOB = "/disableJob"; @@ -18,9 +20,20 @@ public class OpenAPIConstant { public static final String DELETE_JOB = "/deleteJob"; public static final String RUN_JOB = "/runJob"; - + /* ************* Instance 区 ************* */ public static final String STOP_INSTANCE = "/stopInstance"; public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus"; public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo"; + /* ************* Workflow 区 ************* */ + public static final String SAVE_WORKFLOW = "/saveWorkflow"; + public static final String FETCH_WORKFLOW = "/fetchWorkflow"; + public static final String DISABLE_WORKFLOW = "/disableWorkflow"; + public static final String ENABLE_WORKFLOW = "/enableWorkflow"; + public static final String DELETE_WORKFLOW = "/deleteWorkflow"; + public static final String RUN_WORKFLOW = "/runWorkflow"; + + /* ************* WorkflowInstance 区 ************* */ + public static final String STOP_WORKFLOW_INSTANCE = "/stopWfInstance"; + public static final String FETCH_WORKFLOW_INSTANCE_INFO = "/fetchWfInstanceInfo"; } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/TimeExpressionType.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/TimeExpressionType.java index 76276672..d5a186fb 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/TimeExpressionType.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/TimeExpressionType.java @@ -19,7 +19,8 @@ public enum TimeExpressionType { API(1), CRON(2), FIX_RATE(3), - FIX_DELAY(4); + FIX_DELAY(4), + WORKFLOW(5); int v; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java index 913989e4..4d9cb4cc 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/model/PEWorkflowDAG.java @@ -1,9 +1,12 @@ package com.github.kfcfans.oms.common.model; +import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.List; /** @@ -15,7 +18,6 @@ import java.util.List; */ @Data @NoArgsConstructor -@AllArgsConstructor public class PEWorkflowDAG { // DAG 图(点线表示法) @@ -44,4 +46,9 @@ public class PEWorkflowDAG { private Long from; private Long to; } + + public PEWorkflowDAG(@Nonnull List nodes, @Nullable List edges) { + this.nodes = nodes; + this.edges = edges == null ? Lists.newLinkedList() : edges; + } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveJobInfoRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveJobInfoRequest.java index c72e44a9..8a01a4fc 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveJobInfoRequest.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveJobInfoRequest.java @@ -23,7 +23,7 @@ public class SaveJobInfoRequest { private String jobName; // 任务描述 private String jobDescription; - // 任务所属的应用ID(Client无需填写该参数) + // 任务所属的应用ID(Client无需填写该参数,自动填充) private Long appId; // 任务自带的参数 private String jobParams; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java index 68ee0303..3724466c 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.common.request.http; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.model.PEWorkflowDAG; +import com.google.common.collect.Lists; import lombok.Data; import java.util.List; @@ -17,27 +18,29 @@ public class SaveWorkflowRequest { private Long id; + // 工作流名称 private String wfName; + // 工作流描述 private String wfDescription; - // 所属应用ID + // 所属应用ID(OpenClient不需要用户填写,自动填充) private Long appId; // 点线表示法 private PEWorkflowDAG pEWorkflowDAG; /* ************************** 定时参数 ************************** */ - // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + // 时间表达式类型,仅支持 CRON 和 API private TimeExpressionType timeExpressionType; // 时间表达式,CRON/NULL/LONG/LONG private String timeExpression; // 最大同时运行的工作流个数,默认 1 - private Integer maxWfInstanceNum; + private Integer maxWfInstanceNum = 1; // ENABLE / DISABLE - private boolean enable; + private boolean enable = true; // 工作流整体失败的报警 - private List notifyUserIds; + private List notifyUserIds = Lists.newLinkedList(); } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/response/WorkflowInfoDTO.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/response/WorkflowInfoDTO.java new file mode 100644 index 00000000..c414f4d5 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/response/WorkflowInfoDTO.java @@ -0,0 +1,45 @@ +package com.github.kfcfans.oms.common.response; + +import lombok.Data; + +import java.util.Date; + +/** + * workflowInfo 对外输出对象 + * + * @author tjq + * @since 2020/6/2 + */ +@Data +public class WorkflowInfoDTO { + + private Long id; + private String wfName; + private String wfDescription; + + // 所属应用ID + private Long appId; + + // 工作流的DAG图信息(点线式DAG的json) + private String peDAG; + + /* ************************** 定时参数 ************************** */ + // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + private Integer timeExpressionType; + // 时间表达式,CRON/NULL/LONG/LONG + private String timeExpression; + + // 最大同时运行的工作流个数,默认 1 + private Integer maxWfInstanceNum; + + // 1 正常运行,2 停止(不再调度) + private Integer status; + // 下一次调度时间 + private Long nextTriggerTime; + + // 工作流整体失败的报警 + private String notifyUserIds; + + private Date gmtCreate; + private Date gmtModified; +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/response/WorkflowInstanceInfoDTO.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/response/WorkflowInstanceInfoDTO.java new file mode 100644 index 00000000..67505911 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/response/WorkflowInstanceInfoDTO.java @@ -0,0 +1,35 @@ +package com.github.kfcfans.oms.common.response; + +import lombok.Data; + +import java.util.Date; + +/** + * WorkflowInstanceInfo 对外输出对象 + * + * @author tjq + * @since 2020/6/2 + */ +@Data +public class WorkflowInstanceInfoDTO { + + private Long id; + private Long appId; + + private Long wfInstanceId; + private Long workflowId; + + // workflow 状态(WorkflowInstanceStatus) + private Integer status; + + private String dag; + private String result; + + // 实际触发时间 + private Long actualTriggerTime; + // 结束时间 + private Long finishedTime; + + private Date gmtCreate; + private Date gmtModified; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java index 28a63f1c..3bed2ae6 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java @@ -40,19 +40,19 @@ public class CacheService { public CacheService() { jobId2JobNameCache = CacheBuilder.newBuilder() .expireAfterWrite(Duration.ofMinutes(1)) - .maximumSize(1024) + .maximumSize(512) .build(); workflowId2WorkflowNameCache = CacheBuilder.newBuilder() .expireAfterWrite(Duration.ofMinutes(1)) - .maximumSize(1024) + .maximumSize(512) .build(); instanceId2AppId = CacheBuilder.newBuilder() - .maximumSize(4096) + .maximumSize(1024) .build(); jobId2AppId = CacheBuilder.newBuilder() - .maximumSize(4096) + .maximumSize(1024) .build(); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java index 6760d2f6..9dcadcc4 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -55,7 +55,7 @@ public class DispatchService { */ public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) { Long jobId = jobInfo.getId(); - log.info("[DispatchService] start to dispatch job: {};instancePrams: {}.", jobInfo, instanceParams); + log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams); String dbInstanceParams = instanceParams == null ? "" : instanceParams; @@ -66,7 +66,7 @@ public class DispatchService { // 超出最大同时运行限制,不执行调度 if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); - log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount); + log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams); InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); @@ -91,7 +91,7 @@ public class DispatchService { if (CollectionUtils.isEmpty(finalWorkers)) { String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId()); - log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription); + log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams); InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); @@ -137,7 +137,7 @@ public class DispatchService { String taskTrackerAddress = finalWorkers.get(0); ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress); taskTrackerActor.tell(req, null); - log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); + log.debug("[Dispatcher-{}|{}] send request({}) to TaskTracker({}) succeed.", jobId, instanceId, req, taskTrackerActor.pathString()); // 修改状态 instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java index 1177116e..2325303b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java @@ -11,7 +11,6 @@ import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; -import com.github.kfcfans.oms.server.service.id.IdGenerateService; import com.github.kfcfans.oms.server.service.instance.InstanceService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -39,8 +38,6 @@ public class JobService { @Resource private DispatchService dispatchService; @Resource - private IdGenerateService idGenerateService; - @Resource private JobInfoRepository jobInfoRepository; @Resource private InstanceInfoRepository instanceInfoRepository; @@ -157,8 +154,7 @@ public class JobService { jobInfoRepository.saveAndFlush(jobInfoDO); // 2. 关闭秒级任务 - TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); - if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) { + if (!TimeExpressionType.frequentTypes.contains(jobInfoDO.getTimeExpressionType())) { return; } List executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java index a87899da..8aa2084d 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java @@ -76,7 +76,7 @@ public class WorkflowInstanceManager { newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); }catch (Exception e) { - log.error("[Workflow-{}] parse PEDag({}) failed.", wfInfo.getId(), wfInfo.getPeDAG(), e); + log.error("[Workflow-{}|{}] parse PEDag({}) failed.", wfInfo.getId(), wfInstanceId, wfInfo.getPeDAG(), e); newWfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); newWfInstance.setResult(e.getMessage()); @@ -102,7 +102,7 @@ public class WorkflowInstanceManager { // 不是等待中,不再继续执行(可能上一流程已经失败) if (wfInstanceInfo.getStatus() != WorkflowInstanceStatus.WAITING.getV()) { - log.info("[Workflow-{}] workflowInstance({}) need't running any more.", wfInfo.getId(), wfInstanceInfo); + log.info("[Workflow-{}|{}] workflowInstance({}) need't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo); return; } @@ -132,7 +132,7 @@ public class WorkflowInstanceManager { wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV()); wfInstanceInfo.setDag(JSONObject.toJSONString(workflowDAG)); workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); - log.info("[Workflow-{}] start workflow successfully, wfInstanceId={}", wfInfo.getId(), wfInstanceId); + log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); // 真正开始执行根任务 roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null)); @@ -142,7 +142,7 @@ public class WorkflowInstanceManager { wfInstanceInfo.setResult(e.getMessage()); wfInstanceInfo.setFinishedTime(System.currentTimeMillis()); - log.error("[Workflow-{}] submit workflow: {} failed.", wfInfo.getId(), wfInfo, e); + log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); } @@ -181,7 +181,7 @@ public class WorkflowInstanceManager { head.setFinished(true); head.setResult(result); - log.debug("[Workflow-{}] node(jobId={}) finished in workflowInstance(wfInstanceId={}), success={},result={}", wfId, head.getJobId(), wfInstanceId, success, result); + log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, success={},result={}", wfId, wfInstanceId, head.getJobId(), success, result); } queue.addAll(head.getSuccessors()); @@ -197,7 +197,7 @@ public class WorkflowInstanceManager { wfInstance.setFinishedTime(System.currentTimeMillis()); workflowInstanceInfoRepository.saveAndFlush(wfInstance); - log.warn("[Workflow-{}] workflow(wfInstanceId={}) process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); + log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); return; } @@ -236,7 +236,7 @@ public class WorkflowInstanceManager { jobId2InstanceId.put(jobId, newInstanceId); jobId2InstanceParams.put(jobId, JSONObject.toJSONString(preJobId2Result)); - log.debug("[Workflow-{}] workflowInstance(wfInstanceId={}) start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId); + log.debug("[Workflow-{}|{}] workflowInstance start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId); }); if (allFinished.get()) { @@ -245,7 +245,7 @@ public class WorkflowInstanceManager { wfInstance.setResult(result); wfInstance.setFinishedTime(System.currentTimeMillis()); - log.info("[Workflow-{}] workflowInstance(wfInstanceId={}) process successfully.", wfId, wfInstanceId); + log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); } wfInstance.setDag(JSONObject.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(wfInstance); @@ -259,7 +259,7 @@ public class WorkflowInstanceManager { wfInstance.setFinishedTime(System.currentTimeMillis()); workflowInstanceInfoRepository.saveAndFlush(wfInstance); - log.error("[Workflow-{}] update failed for workflowInstance({}).", wfId, wfInstanceId, e); + log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e); } } @@ -274,7 +274,7 @@ public class WorkflowInstanceManager { private void runInstance(Long jobId, Long instanceId, Long wfInstanceId, String instanceParams) { JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); // 洗去时间表达式类型 - jobInfo.setTimeExpressionType(TimeExpressionType.API.getV()); + jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV()); dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, wfInstanceId); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java index 57fbb113..91690976 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceService.java @@ -4,12 +4,14 @@ import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.SystemInstanceResult; import com.github.kfcfans.oms.common.WorkflowInstanceStatus; +import com.github.kfcfans.oms.common.response.WorkflowInstanceInfoDTO; import com.github.kfcfans.oms.server.model.WorkflowDAG; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.google.common.collect.Queues; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -39,10 +41,7 @@ public class WorkflowInstanceService { * @param appId 所属应用ID */ public void stopWorkflowInstance(Long wfInstanceId, Long appId) { - WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId)); - if (!Objects.equals(appId, wfInstance.getAppId())) { - throw new OmsException("Permission Denied!"); - } + WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) { throw new OmsException("workflow instance already stopped"); } @@ -70,4 +69,19 @@ public class WorkflowInstanceService { log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId); } + public WorkflowInstanceInfoDTO fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) { + WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); + WorkflowInstanceInfoDTO dto = new WorkflowInstanceInfoDTO(); + BeanUtils.copyProperties(wfInstance, dto); + return dto; + } + + private WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) { + WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId)); + if (!Objects.equals(appId, wfInstance.getAppId())) { + throw new OmsException("Permission Denied!"); + } + return wfInstance; + } + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java index 79f707bc..3849f8c6 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.oms.common.OmsException; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; +import com.github.kfcfans.oms.common.response.WorkflowInfoDTO; import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils; import com.github.kfcfans.oms.server.common.SJ; import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; @@ -70,6 +71,19 @@ public class WorkflowService { return newEntity.getId(); } + /** + * 获取工作流元信息 + * @param wfId 工作流ID + * @param appId 应用ID + * @return 对外输出对象 + */ + public WorkflowInfoDTO fetchWorkflow(Long wfId, Long appId) { + WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); + WorkflowInfoDTO dto = new WorkflowInfoDTO(); + BeanUtils.copyProperties(wfInfo, dto); + return dto; + } + /** * 删除工作流(软删除) * @param wfId 工作流ID @@ -94,6 +108,18 @@ public class WorkflowService { workflowInfoRepository.saveAndFlush(wfInfo); } + /** + * 启用工作流 + * @param wfId 工作流ID + * @param appId 所属应用ID + */ + public void enableWorkflow(Long wfId, Long appId) { + WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); + wfInfo.setStatus(SwitchableStatus.ENABLE.getV()); + wfInfo.setGmtModified(new Date()); + workflowInfoRepository.saveAndFlush(wfInfo); + } + /** * 立即运行工作流 * @param wfId 工作流ID diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/OpenAPIController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/OpenAPIController.java index 6fde9c9e..d4b9ac06 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/OpenAPIController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/OpenAPIController.java @@ -2,15 +2,16 @@ package com.github.kfcfans.oms.server.web.controller; import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.OpenAPIConstant; -import com.github.kfcfans.oms.common.response.InstanceInfoDTO; -import com.github.kfcfans.oms.common.response.JobInfoDTO; -import com.github.kfcfans.oms.common.response.ResultDTO; +import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; +import com.github.kfcfans.oms.common.response.*; import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.oms.server.service.CacheService; import com.github.kfcfans.oms.server.service.JobService; import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest; +import com.github.kfcfans.oms.server.service.workflow.WorkflowInstanceService; +import com.github.kfcfans.oms.server.service.workflow.WorkflowService; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; @@ -29,6 +30,10 @@ public class OpenAPIController { private JobService jobService; @Resource private InstanceService instanceService; + @Resource + private WorkflowService workflowService; + @Resource + private WorkflowInstanceService workflowInstanceService; @Resource private CacheService cacheService; @@ -105,6 +110,52 @@ public class OpenAPIController { return ResultDTO.success(instanceService.getInstanceInfo(instanceId)); } + /* ************* Workflow 区 ************* */ + @PostMapping(OpenAPIConstant.SAVE_WORKFLOW) + public ResultDTO saveWorkflow(@RequestBody SaveWorkflowRequest request) throws Exception { + if (request.getId() != null) { + checkJobIdValid(request.getId(), request.getAppId()); + } + return ResultDTO.success(workflowService.saveWorkflow(request)); + } + + @PostMapping(OpenAPIConstant.FETCH_WORKFLOW) + public ResultDTO fetchWorkflow(Long workflowId, Long appId) { + return ResultDTO.success(workflowService.fetchWorkflow(workflowId, appId)); + } + + @PostMapping(OpenAPIConstant.DELETE_WORKFLOW) + public ResultDTO deleteWorkflow(Long workflowId, Long appId) { + workflowService.deleteWorkflow(workflowId, appId); + return ResultDTO.success(null); + } + @PostMapping(OpenAPIConstant.DISABLE_WORKFLOW) + public ResultDTO disableWorkflow(Long workflowId, Long appId) { + workflowService.disableWorkflow(workflowId, appId); + return ResultDTO.success(null); + } + @PostMapping(OpenAPIConstant.ENABLE_WORKFLOW) + public ResultDTO enableWorkflow(Long workflowId, Long appId) { + workflowService.enableWorkflow(workflowId, appId); + return ResultDTO.success(null); + } + + @PostMapping(OpenAPIConstant.RUN_WORKFLOW) + public ResultDTO runWorkflow(Long workflowId, Long appId) { + return ResultDTO.success(workflowService.runWorkflow(workflowId, appId)); + } + + /* ************* Workflow Instance 区 ************* */ + @PostMapping(OpenAPIConstant.STOP_WORKFLOW_INSTANCE) + public ResultDTO stopWorkflowInstance(Long wfInstance, Long appId) { + workflowInstanceService.stopWorkflowInstance(wfInstance, appId); + return ResultDTO.success(null); + } + @PostMapping(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO) + public ResultDTO fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) { + return ResultDTO.success(workflowInstanceService.fetchWorkflowInstanceInfo(wfInstanceId, appId)); + } + private void checkInstanceIdValid(Long instanceId, Long appId) { Long realAppId = cacheService.getAppIdByInstanceId(instanceId); if (realAppId == null) { diff --git a/oh-my-scheduler-server/src/main/resources/logback-dev.xml b/oh-my-scheduler-server/src/main/resources/logback-dev.xml index 6ecd910f..517961e8 100644 --- a/oh-my-scheduler-server/src/main/resources/logback-dev.xml +++ b/oh-my-scheduler-server/src/main/resources/logback-dev.xml @@ -20,6 +20,11 @@ + + + + +