diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java index 072eed82..f8835d0f 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/request/http/SaveWorkflowRequest.java @@ -3,6 +3,8 @@ package com.github.kfcfans.oms.common.request.http; import com.github.kfcfans.oms.common.model.PEWorkflowDAG; import lombok.Data; +import java.util.List; + /** * 创建/修改 Workflow 请求 * @@ -25,15 +27,13 @@ public class SaveWorkflowRequest { /* ************************** 定时参数 ************************** */ // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) - private Integer timeExpressionType; + private String timeExpressionType; // 时间表达式,CRON/NULL/LONG/LONG private String timeExpression; - // 1 正常运行,2 停止(不再调度) - private Integer status; + // ENABLE / DISABLE + private String status; // 工作流整体失败的报警 - private String notifyUserIds; - - + private List notifyUserIds; } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/JsonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/JsonUtils.java index 137d439d..0d9ad291 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/JsonUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/JsonUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.common.utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.exception.ExceptionUtils; /** * JSON工具类 @@ -44,4 +45,14 @@ public class JsonUtils { public static T parseObject(byte[] b, Class clz) throws Exception { return objectMapper.readValue(b, clz); } + + public static T parseObjectUnsafe(String json, Class clz) { + try { + return objectMapper.readValue(json, clz); + }catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // impossible + return null; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/SJ.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/SJ.java new file mode 100644 index 00000000..652d4378 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/SJ.java @@ -0,0 +1,17 @@ +package com.github.kfcfans.oms.server.common; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; + +/** + * Splitter & Joiner + * + * @author tjq + * @since 2020/5/27 + */ +public class SJ { + + public static final Splitter commaSplitter = Splitter.on(","); + public static final Joiner commaJoiner = Joiner.on(","); + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/SwitchableStatus.java similarity index 53% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/SwitchableStatus.java index 1dd972a1..9fdf87e6 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/SwitchableStatus.java @@ -4,14 +4,14 @@ import lombok.AllArgsConstructor; import lombok.Getter; /** - * 任务状态 + * 支持开/关的状态,如 任务状态(JobStatus)和工作流状态(WorkflowStatus) * * @author tjq * @since 2020/4/6 */ @Getter @AllArgsConstructor -public enum JobStatus { +public enum SwitchableStatus { ENABLE(1), DISABLE(2), @@ -19,12 +19,12 @@ public enum JobStatus { private int v; - public static JobStatus of(int v) { - for (JobStatus type : values()) { + public static SwitchableStatus of(int v) { + for (SwitchableStatus type : values()) { if (type.v == v) { return type; } } - throw new IllegalArgumentException("unknown JobStatus of " + v); + throw new IllegalArgumentException("unknown SwitchableStatus of " + v); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/JobInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/JobInfoRepository.java index 0d4193f5..e4d82bd6 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/JobInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/JobInfoRepository.java @@ -23,7 +23,7 @@ public interface JobInfoRepository extends JpaRepository { @Query(value = "select id from job_info where app_id in ?1 and status = ?2 and time_expression_type in ?3", nativeQuery = true) List findByAppIdInAndStatusAndTimeExpressionTypeIn(List appIds, int status, List timeTypes); - Page findByAppIdAndStatusNot(Long appId, Pageable pageable, int status); + Page findByAppIdAndStatusNot(Long appId, int status, Pageable pageable); Page findByAppIdAndJobNameLikeAndStatusNot(Long appId, String condition, int status, Pageable pageable); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java index 0978050d..cd2b736d 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInfoRepository.java @@ -1,6 +1,8 @@ package com.github.kfcfans.oms.server.persistence.core.repository; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import java.util.List; @@ -16,4 +18,8 @@ public interface WorkflowInfoRepository extends JpaRepository findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); + // 对外查询(list)三兄弟 + Page findByAppIdAndStatusNot(Long appId, int nStatus, Pageable pageable); + Page findByIdAndStatusNot(Long id, int nStatus, Pageable pageable); + Page findByAppIdInAndStatusNotAndWfNameLike(Long appId, int nStatus, String condition, Pageable pageable); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java index 18b5bf02..8706c37a 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/JobService.java @@ -4,7 +4,8 @@ import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.oms.common.response.JobInfoDTO; -import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.common.SJ; +import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; @@ -12,7 +13,6 @@ import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRep import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.server.service.id.IdGenerateService; import com.github.kfcfans.oms.server.service.instance.InstanceService; -import com.google.common.base.Joiner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -45,8 +45,6 @@ public class JobService { @Resource private InstanceInfoRepository instanceInfoRepository; - private static final Joiner commaJoiner = Joiner.on(",").skipNulls(); - /** * 保存/修改任务 * @param request 任务请求 @@ -70,7 +68,7 @@ public class JobService { jobInfoDO.setExecuteType(request.getExecuteType().getV()); jobInfoDO.setProcessorType(request.getProcessorType().getV()); jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV()); - jobInfoDO.setStatus(request.isEnable() ? JobStatus.ENABLE.getV() : JobStatus.DISABLE.getV()); + jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); if (jobInfoDO.getMaxWorkerCount() == null) { jobInfoDO.setMaxInstanceNum(0); @@ -78,7 +76,7 @@ public class JobService { // 转化报警用户列表 if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) { - jobInfoDO.setNotifyUserIds(commaJoiner.join(request.getNotifyUserIds())); + jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds())); } refreshJob(jobInfoDO); @@ -130,14 +128,14 @@ public class JobService { * @param jobId 任务ID */ public void deleteJob(Long jobId) { - shutdownOrStopJob(jobId, JobStatus.DELETED); + shutdownOrStopJob(jobId, SwitchableStatus.DELETED); } /** * 禁用某个任务 */ public void disableJob(Long jobId) { - shutdownOrStopJob(jobId, JobStatus.DISABLE); + shutdownOrStopJob(jobId, SwitchableStatus.DISABLE); } /** @@ -148,7 +146,7 @@ public class JobService { public void enableJob(Long jobId) throws Exception { JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); - jobInfoDO.setStatus(JobStatus.ENABLE.getV()); + jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); refreshJob(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO); @@ -158,7 +156,7 @@ public class JobService { * 停止或删除某个JOB * 秒级任务还要额外停止正在运行的任务实例 */ - private void shutdownOrStopJob(Long jobId, JobStatus status) throws IllegalArgumentException { + private void shutdownOrStopJob(Long jobId, SwitchableStatus status) throws IllegalArgumentException { // 1. 先更新 job_info 表 Optional jobInfoOPT = jobInfoRepository.findById(jobId); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index 3b40ee9b..7e7bd90f 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -15,6 +15,7 @@ import com.github.kfcfans.oms.server.service.InstanceLogService; import com.github.kfcfans.oms.server.service.alarm.AlarmContent; import com.github.kfcfans.oms.server.service.alarm.Alarmable; import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder; +import com.github.kfcfans.oms.server.service.workflow.WorkflowInstanceManager; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -45,6 +46,7 @@ public class InstanceManager { private static InstanceInfoRepository instanceInfoRepository; private static JobInfoRepository jobInfoRepository; private static Alarmable omsCenterAlarmService; + private static WorkflowInstanceManager workflowInstanceManager; /** * 注册到任务实例管理器 @@ -142,6 +144,11 @@ public class InstanceManager { if (finished) { processFinishedInstance(instanceId, updateEntity.getStatus()); + + // workflow 特殊处理 + if (req.getWfInstanceId() != null) { + getWorkflowInstanceManager().move(req.getWfInstanceId(), instanceId, newStatus == InstanceStatus.SUCCEED, req.getResult()); + } } } @@ -253,4 +260,15 @@ public class InstanceManager { } return omsCenterAlarmService; } + + private static WorkflowInstanceManager getWorkflowInstanceManager() { + while (workflowInstanceManager == null) { + try { + Thread.sleep(100); + }catch (Exception ignore) { + } + workflowInstanceManager = SpringUtils.getBean(WorkflowInstanceManager.class); + } + return workflowInstanceManager; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java index 0b914161..05777fb3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -3,7 +3,7 @@ package com.github.kfcfans.oms.server.service.timing; import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.SystemInstanceResult; import com.github.kfcfans.oms.common.TimeExpressionType; -import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; @@ -114,10 +114,10 @@ public class InstanceStatusCheckService { JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); - JobStatus jobStatus = JobStatus.of(jobInfoDO.getStatus()); + SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoDO.getStatus()); // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度 - if (jobStatus != JobStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { + if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { updateFailedInstance(instance); return; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/OmsScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/OmsScheduleService.java index 381d4cb4..60384bfb 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/OmsScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/OmsScheduleService.java @@ -3,7 +3,7 @@ package com.github.kfcfans.oms.server.service.timing.schedule; import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.server.akka.OhMyServer; -import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; @@ -128,7 +128,7 @@ public class OmsScheduleService { try { // 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行 - List jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); + List jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); if (CollectionUtils.isEmpty(jobInfos)) { return; @@ -211,7 +211,7 @@ public class OmsScheduleService { long nowTime = System.currentTimeMillis(); long timeThreshold = nowTime + 2 * SCHEDULE_RATE; Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> { - List wfInfos = workflowInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); + List wfInfos = workflowInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); if (CollectionUtils.isEmpty(wfInfos)) { return; @@ -226,7 +226,7 @@ public class OmsScheduleService { Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> { try { // 查询所有的秒级任务(只包含ID) - List jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.frequentTypes); + List jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.frequentTypes); // 查询日志记录表中是否存在相关的任务 List runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus); Set runningJobIdSet = Sets.newHashSet(runningJobIdList); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java index ca65deda..c09730ca 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowInstanceManager.java @@ -58,7 +58,7 @@ public class WorkflowInstanceManager { * 提交运行 Workflow 工作流 * @param wfInfo workflow 工作流数据库对象 */ - public void submit(WorkflowInfoDO wfInfo) { + public Long submit(WorkflowInfoDO wfInfo) { Long wfInstanceId = idGenerateService.allocate(); @@ -91,6 +91,7 @@ public class WorkflowInstanceManager { log.error("[WorkflowInstanceManager] submit workflow: {} failed.", wfInfo, e); } workflowInstanceInfoRepository.saveAndFlush(newWfInstance); + return wfInstanceId; } /** @@ -165,7 +166,12 @@ public class WorkflowInstanceManager { jobId2Node.get(jobId).setInstanceId(newInstanceId); }); - wfInstance.setDag(JsonUtils.toJSONStringUnsafe(dag)); + if (allFinished.get()) { + wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); + // 最终任务的结果作为整个 workflow 的结果 + wfInstance.setResult(result); + } + wfInstance.setDag(JsonUtils.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(wfInstance); }catch (Exception e) { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java index 81d48555..73989398 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/workflow/WorkflowService.java @@ -5,6 +5,8 @@ import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.oms.common.utils.JsonUtils; import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.oms.server.common.SJ; +import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository; @@ -25,6 +27,8 @@ public class WorkflowService { @Resource private WorkflowInfoRepository workflowInfoRepository; + @Resource + private WorkflowInstanceManager workflowInstanceManager; /** * 保存/修改DAG工作流 @@ -50,9 +54,11 @@ public class WorkflowService { BeanUtils.copyProperties(req, wf); wf.setGmtModified(new Date()); wf.setPeDAG(JsonUtils.toJSONString(req.getPEWorkflowDAG())); + wf.setStatus(SwitchableStatus.valueOf(req.getStatus()).getV()); + wf.setNotifyUserIds(SJ.commaJoiner.join(req.getNotifyUserIds())); // 计算 NextTriggerTime - TimeExpressionType timeExpressionType = TimeExpressionType.of(req.getTimeExpressionType()); + TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); if (timeExpressionType == TimeExpressionType.CRON) { CronExpression cronExpression = new CronExpression(req.getTimeExpression()); Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date()); @@ -63,4 +69,46 @@ public class WorkflowService { return newEntity.getId(); } + /** + * 删除工作流(软删除) + * @param wfId 工作流ID + * @param appId 所属应用ID + */ + public void deleteWorkflow(Long wfId, Long appId) { + WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); + wfInfo.setStatus(SwitchableStatus.DELETED.getV()); + wfInfo.setGmtModified(new Date()); + workflowInfoRepository.saveAndFlush(wfInfo); + } + + /** + * 禁用工作流 + * @param wfId 工作流ID + * @param appId 所属应用ID + */ + public void disableWorkflow(Long wfId, Long appId) { + WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); + wfInfo.setStatus(SwitchableStatus.DISABLE.getV()); + wfInfo.setGmtModified(new Date()); + workflowInfoRepository.saveAndFlush(wfInfo); + } + + /** + * 立即运行工作流 + * @param wfId 工作流ID + * @param appId 所属应用ID + * @return 该 workflow 实例的 instanceId(wfInstanceId) + */ + public Long runWorkflow(Long wfId, Long appId) { + WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); + return workflowInstanceManager.submit(wfInfo); + } + + private WorkflowInfoDO permissionCheck(Long wfId, Long appId) { + WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId)); + if (!wfInfo.getAppId().equals(appId)) { + throw new OmsException("Permission Denied!can't delete other appId's workflow!"); + } + return wfInfo; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index dbda246f..0b2cecf5 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -3,7 +3,7 @@ package com.github.kfcfans.oms.server.web.controller; import com.github.kfcfans.oms.common.ExecuteType; import com.github.kfcfans.oms.common.ProcessorType; import com.github.kfcfans.oms.common.TimeExpressionType; -import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; import com.github.kfcfans.oms.server.persistence.PageResult; import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.common.response.ResultDTO; @@ -77,7 +77,7 @@ public class JobController { // 无查询条件,查询全部 if (request.getJobId() == null && StringUtils.isEmpty(request.getKeyword())) { - jobInfoPage = jobInfoRepository.findByAppIdAndStatusNot(request.getAppId(), pageRequest, JobStatus.DELETED.getV()); + jobInfoPage = jobInfoRepository.findByAppIdAndStatusNot(request.getAppId(), SwitchableStatus.DELETED.getV(), pageRequest); return ResultDTO.success(convertPage(jobInfoPage)); } @@ -104,7 +104,7 @@ public class JobController { // 模糊查询 String condition = "%" + request.getKeyword() + "%"; - jobInfoPage = jobInfoRepository.findByAppIdAndJobNameLikeAndStatusNot(request.getAppId(), condition, JobStatus.DELETED.getV(), pageRequest); + jobInfoPage = jobInfoRepository.findByAppIdAndJobNameLikeAndStatusNot(request.getAppId(), condition, SwitchableStatus.DELETED.getV(), pageRequest); return ResultDTO.success(convertPage(jobInfoPage)); } @@ -130,7 +130,7 @@ public class JobController { jobInfoVO.setTimeExpressionType(timeExpressionType.name()); jobInfoVO.setExecuteType(executeType.name()); jobInfoVO.setProcessorType(processorType.name()); - jobInfoVO.setEnable(jobInfoDO.getStatus() == JobStatus.ENABLE.getV()); + jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV()); if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) { jobInfoVO.setNotifyUserIds(commaSplitter.splitToList(jobInfoDO.getNotifyUserIds())); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java index 43c8c85b..57beb151 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/WorkflowController.java @@ -2,13 +2,21 @@ package com.github.kfcfans.oms.server.web.controller; import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.oms.common.response.ResultDTO; +import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; +import com.github.kfcfans.oms.server.persistence.PageResult; +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; +import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.oms.server.service.workflow.WorkflowService; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import com.github.kfcfans.oms.server.web.request.QueryWorkflowInfoRequest; +import com.github.kfcfans.oms.server.web.response.WorkflowInfoVO; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; +import java.util.stream.Collectors; /** * 工作流控制器 @@ -22,10 +30,57 @@ public class WorkflowController { @Resource private WorkflowService workflowService; + @Resource + private WorkflowInfoRepository workflowInfoRepository; @PostMapping("/save") public ResultDTO save(@RequestBody SaveWorkflowRequest req) throws Exception { return ResultDTO.success(workflowService.saveWorkflow(req)); } + @GetMapping("/disable") + public ResultDTO disableWorkflow(Long workflowId, Long appId) { + workflowService.disableWorkflow(workflowId, appId); + return ResultDTO.success(null); + } + + @GetMapping("/delete") + public ResultDTO deleteWorkflow(Long workflowId, Long appId) { + workflowService.deleteWorkflow(workflowId, appId); + return ResultDTO.success(null); + } + + @PostMapping("/list") + public ResultDTO> list(@RequestBody QueryWorkflowInfoRequest req) { + + Sort sort = Sort.by(Sort.Direction.DESC, "gmtCreate"); + PageRequest pageRequest = PageRequest.of(req.getIndex(), req.getPageSize(), sort); + Page wfPage; + + // 排除已删除数据 + int nStatus = SwitchableStatus.DELETED.getV(); + // 无查询条件,查询全部 + if (req.getWorkflowId() == null && StringUtils.isEmpty(req.getKeyword())) { + wfPage = workflowInfoRepository.findByAppIdAndStatusNot(req.getAppId(), nStatus, pageRequest); + }else if (req.getWorkflowId() != null) { + wfPage = workflowInfoRepository.findByIdAndStatusNot(req.getWorkflowId(), nStatus, pageRequest); + }else { + String condition = "%" + req.getKeyword() + "%"; + wfPage = workflowInfoRepository.findByAppIdInAndStatusNotAndWfNameLike(req.getAppId(), nStatus, condition, pageRequest); + } + return ResultDTO.success(convertPage(wfPage)); + } + + @GetMapping("/run") + public ResultDTO runWorkflow(Long workflowId, Long appId) { + return ResultDTO.success(workflowService.runWorkflow(workflowId, appId)); + } + + private static PageResult convertPage(Page originPage) { + + PageResult newPage = new PageResult<>(originPage); + newPage.setData(originPage.getContent().stream().map(WorkflowInfoVO::from).collect(Collectors.toList())); + return newPage; + } + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryWorkflowInfoRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryWorkflowInfoRequest.java new file mode 100644 index 00000000..2748b271 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryWorkflowInfoRequest.java @@ -0,0 +1,25 @@ +package com.github.kfcfans.oms.server.web.request; + +import lombok.Data; + +/** + * 查询工作流 + * + * @author tjq + * @since 2020/5/27 + */ +@Data +public class QueryWorkflowInfoRequest { + + // 任务所属应用ID + private Long appId; + // 当前页码 + private Integer index; + // 页大小 + private Integer pageSize; + + // 查询条件 + private Long workflowId; + private String keyword; + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInfoVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInfoVO.java new file mode 100644 index 00000000..75f38c07 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkflowInfoVO.java @@ -0,0 +1,55 @@ +package com.github.kfcfans.oms.server.web.response; + +import com.github.kfcfans.oms.common.TimeExpressionType; +import com.github.kfcfans.oms.common.model.PEWorkflowDAG; +import com.github.kfcfans.oms.common.utils.JsonUtils; +import com.github.kfcfans.oms.server.common.SJ; +import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; +import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; +import lombok.Data; +import org.springframework.beans.BeanUtils; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * 工作流对外展示对象 + * + * @author tjq + * @since 2020/5/27 + */ +@Data +public class WorkflowInfoVO { + + private Long id; + + private String wfName; + private String wfDescription; + + // 点线表示法 + private PEWorkflowDAG pEWorkflowDAG; + + /* ************************** 定时参数 ************************** */ + // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + private String timeExpressionType; + // 时间表达式,CRON/NULL/LONG/LONG + private String timeExpression; + + // ENABLE / DISABLE + private String status; + + // 工作流整体失败的报警 + private List notifyUserIds; + + public static WorkflowInfoVO from(WorkflowInfoDO wfDO) { + WorkflowInfoVO vo = new WorkflowInfoVO(); + BeanUtils.copyProperties(wfDO, vo); + + vo.setStatus(SwitchableStatus.of(wfDO.getStatus()).name()); + vo.setTimeExpressionType(TimeExpressionType.of(wfDO.getTimeExpressionType()).name()); + vo.setPEWorkflowDAG(JsonUtils.parseObjectUnsafe(wfDO.getPeDAG(), PEWorkflowDAG.class)); + vo.setNotifyUserIds(SJ.commaSplitter.splitToList(wfDO.getNotifyUserIds()).stream().map(Long::valueOf).collect(Collectors.toList())); + + return vo; + } +} diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java index 62770201..8578dac3 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java @@ -2,7 +2,7 @@ package com.github.kfcfans.oms.server.test; import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.utils.NetUtils; -import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.common.constans.SwitchableStatus; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.OmsLockDO; @@ -54,7 +54,7 @@ public class RepositoryTest { @Test public void testSelectCronJobSQL() { - List result = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(Lists.newArrayList(1L), JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), System.currentTimeMillis()); + List result = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(Lists.newArrayList(1L), SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), System.currentTimeMillis()); System.out.println(result); }