[dev] finished workflow's crud

This commit is contained in:
tjq 2020-05-27 13:38:46 +08:00
parent 955713e6c6
commit d89fb000fa
17 changed files with 281 additions and 42 deletions

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.oms.common.request.http;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG; import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import lombok.Data; import lombok.Data;
import java.util.List;
/** /**
* 创建/修改 Workflow 请求 * 创建/修改 Workflow 请求
* *
@ -25,15 +27,13 @@ public class SaveWorkflowRequest {
/* ************************** 定时参数 ************************** */ /* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY // 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private Integer timeExpressionType; private String timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG // 时间表达式CRON/NULL/LONG/LONG
private String timeExpression; private String timeExpression;
// 1 正常运行2 停止不再调度 // ENABLE / DISABLE
private Integer status; private String status;
// 工作流整体失败的报警 // 工作流整体失败的报警
private String notifyUserIds; private List<Long> notifyUserIds;
} }

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.exception.ExceptionUtils;
/** /**
* JSON工具类 * JSON工具类
@ -44,4 +45,14 @@ public class JsonUtils {
public static <T> T parseObject(byte[] b, Class<T> clz) throws Exception { public static <T> T parseObject(byte[] b, Class<T> clz) throws Exception {
return objectMapper.readValue(b, clz); return objectMapper.readValue(b, clz);
} }
public static <T> T parseObjectUnsafe(String json, Class<T> clz) {
try {
return objectMapper.readValue(json, clz);
}catch (Exception e) {
ExceptionUtils.rethrow(e);
}
// impossible
return null;
}
} }

View File

@ -0,0 +1,17 @@
package com.github.kfcfans.oms.server.common;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
/**
* Splitter & Joiner
*
* @author tjq
* @since 2020/5/27
*/
public class SJ {
public static final Splitter commaSplitter = Splitter.on(",");
public static final Joiner commaJoiner = Joiner.on(",");
}

View File

@ -4,14 +4,14 @@ import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
/** /**
* 任务状态 * 支持开/关的状态 任务状态JobStatus和工作流状态WorkflowStatus
* *
* @author tjq * @author tjq
* @since 2020/4/6 * @since 2020/4/6
*/ */
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum JobStatus { public enum SwitchableStatus {
ENABLE(1), ENABLE(1),
DISABLE(2), DISABLE(2),
@ -19,12 +19,12 @@ public enum JobStatus {
private int v; private int v;
public static JobStatus of(int v) { public static SwitchableStatus of(int v) {
for (JobStatus type : values()) { for (SwitchableStatus type : values()) {
if (type.v == v) { if (type.v == v) {
return type; return type;
} }
} }
throw new IllegalArgumentException("unknown JobStatus of " + v); throw new IllegalArgumentException("unknown SwitchableStatus of " + v);
} }
} }

View File

@ -23,7 +23,7 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
@Query(value = "select id from job_info where app_id in ?1 and status = ?2 and time_expression_type in ?3", nativeQuery = true) @Query(value = "select id from job_info where app_id in ?1 and status = ?2 and time_expression_type in ?3", nativeQuery = true)
List<Long> findByAppIdInAndStatusAndTimeExpressionTypeIn(List<Long> appIds, int status, List<Integer> timeTypes); List<Long> findByAppIdInAndStatusAndTimeExpressionTypeIn(List<Long> appIds, int status, List<Integer> timeTypes);
Page<JobInfoDO> findByAppIdAndStatusNot(Long appId, Pageable pageable, int status); Page<JobInfoDO> findByAppIdAndStatusNot(Long appId, int status, Pageable pageable);
Page<JobInfoDO> findByAppIdAndJobNameLikeAndStatusNot(Long appId, String condition, int status, Pageable pageable); Page<JobInfoDO> findByAppIdAndJobNameLikeAndStatusNot(Long appId, String condition, int status, Pageable pageable);

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.oms.server.persistence.core.repository; package com.github.kfcfans.oms.server.persistence.core.repository;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List; import java.util.List;
@ -16,4 +18,8 @@ public interface WorkflowInfoRepository extends JpaRepository<WorkflowInfoDO, Lo
List<WorkflowInfoDO> findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time); List<WorkflowInfoDO> findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time);
// 对外查询list三兄弟
Page<WorkflowInfoDO> findByAppIdAndStatusNot(Long appId, int nStatus, Pageable pageable);
Page<WorkflowInfoDO> findByIdAndStatusNot(Long id, int nStatus, Pageable pageable);
Page<WorkflowInfoDO> findByAppIdInAndStatusNotAndWfNameLike(Long appId, int nStatus, String condition, Pageable pageable);
} }

View File

@ -4,7 +4,8 @@ import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.oms.common.response.JobInfoDTO; import com.github.kfcfans.oms.common.response.JobInfoDTO;
import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.SJ;
import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
@ -12,7 +13,6 @@ import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRep
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.service.id.IdGenerateService; import com.github.kfcfans.oms.server.service.id.IdGenerateService;
import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -45,8 +45,6 @@ public class JobService {
@Resource @Resource
private InstanceInfoRepository instanceInfoRepository; private InstanceInfoRepository instanceInfoRepository;
private static final Joiner commaJoiner = Joiner.on(",").skipNulls();
/** /**
* 保存/修改任务 * 保存/修改任务
* @param request 任务请求 * @param request 任务请求
@ -70,7 +68,7 @@ public class JobService {
jobInfoDO.setExecuteType(request.getExecuteType().getV()); jobInfoDO.setExecuteType(request.getExecuteType().getV());
jobInfoDO.setProcessorType(request.getProcessorType().getV()); jobInfoDO.setProcessorType(request.getProcessorType().getV());
jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV()); jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV());
jobInfoDO.setStatus(request.isEnable() ? JobStatus.ENABLE.getV() : JobStatus.DISABLE.getV()); jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
if (jobInfoDO.getMaxWorkerCount() == null) { if (jobInfoDO.getMaxWorkerCount() == null) {
jobInfoDO.setMaxInstanceNum(0); jobInfoDO.setMaxInstanceNum(0);
@ -78,7 +76,7 @@ public class JobService {
// 转化报警用户列表 // 转化报警用户列表
if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) { if (!CollectionUtils.isEmpty(request.getNotifyUserIds())) {
jobInfoDO.setNotifyUserIds(commaJoiner.join(request.getNotifyUserIds())); jobInfoDO.setNotifyUserIds(SJ.commaJoiner.join(request.getNotifyUserIds()));
} }
refreshJob(jobInfoDO); refreshJob(jobInfoDO);
@ -130,14 +128,14 @@ public class JobService {
* @param jobId 任务ID * @param jobId 任务ID
*/ */
public void deleteJob(Long jobId) { public void deleteJob(Long jobId) {
shutdownOrStopJob(jobId, JobStatus.DELETED); shutdownOrStopJob(jobId, SwitchableStatus.DELETED);
} }
/** /**
* 禁用某个任务 * 禁用某个任务
*/ */
public void disableJob(Long jobId) { public void disableJob(Long jobId) {
shutdownOrStopJob(jobId, JobStatus.DISABLE); shutdownOrStopJob(jobId, SwitchableStatus.DISABLE);
} }
/** /**
@ -148,7 +146,7 @@ public class JobService {
public void enableJob(Long jobId) throws Exception { public void enableJob(Long jobId) throws Exception {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
jobInfoDO.setStatus(JobStatus.ENABLE.getV()); jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
refreshJob(jobInfoDO); refreshJob(jobInfoDO);
jobInfoRepository.saveAndFlush(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO);
@ -158,7 +156,7 @@ public class JobService {
* 停止或删除某个JOB * 停止或删除某个JOB
* 秒级任务还要额外停止正在运行的任务实例 * 秒级任务还要额外停止正在运行的任务实例
*/ */
private void shutdownOrStopJob(Long jobId, JobStatus status) throws IllegalArgumentException { private void shutdownOrStopJob(Long jobId, SwitchableStatus status) throws IllegalArgumentException {
// 1. 先更新 job_info // 1. 先更新 job_info
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId); Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);

View File

@ -15,6 +15,7 @@ import com.github.kfcfans.oms.server.service.InstanceLogService;
import com.github.kfcfans.oms.server.service.alarm.AlarmContent; import com.github.kfcfans.oms.server.service.alarm.AlarmContent;
import com.github.kfcfans.oms.server.service.alarm.Alarmable; import com.github.kfcfans.oms.server.service.alarm.Alarmable;
import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder;
import com.github.kfcfans.oms.server.service.workflow.WorkflowInstanceManager;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
@ -45,6 +46,7 @@ public class InstanceManager {
private static InstanceInfoRepository instanceInfoRepository; private static InstanceInfoRepository instanceInfoRepository;
private static JobInfoRepository jobInfoRepository; private static JobInfoRepository jobInfoRepository;
private static Alarmable omsCenterAlarmService; private static Alarmable omsCenterAlarmService;
private static WorkflowInstanceManager workflowInstanceManager;
/** /**
* 注册到任务实例管理器 * 注册到任务实例管理器
@ -142,6 +144,11 @@ public class InstanceManager {
if (finished) { if (finished) {
processFinishedInstance(instanceId, updateEntity.getStatus()); processFinishedInstance(instanceId, updateEntity.getStatus());
// workflow 特殊处理
if (req.getWfInstanceId() != null) {
getWorkflowInstanceManager().move(req.getWfInstanceId(), instanceId, newStatus == InstanceStatus.SUCCEED, req.getResult());
}
} }
} }
@ -253,4 +260,15 @@ public class InstanceManager {
} }
return omsCenterAlarmService; return omsCenterAlarmService;
} }
private static WorkflowInstanceManager getWorkflowInstanceManager() {
while (workflowInstanceManager == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
workflowInstanceManager = SpringUtils.getBean(WorkflowInstanceManager.class);
}
return workflowInstanceManager;
}
} }

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.oms.server.service.timing;
import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.SystemInstanceResult; import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
@ -114,10 +114,10 @@ public class InstanceStatusCheckService {
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
JobStatus jobStatus = JobStatus.of(jobInfoDO.getStatus()); SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoDO.getStatus());
// 如果任务已关闭则不进行重试将任务置为失败即可秒级任务也直接置为失败由派发器重新调度 // 如果任务已关闭则不进行重试将任务置为失败即可秒级任务也直接置为失败由派发器重新调度
if (jobStatus != JobStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) { if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) {
updateFailedInstance(instance); updateFailedInstance(instance);
return; return;
} }

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.oms.server.service.timing.schedule;
import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
@ -128,7 +128,7 @@ public class OmsScheduleService {
try { try {
// 查询条件任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行 // 查询条件任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行
List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold);
if (CollectionUtils.isEmpty(jobInfos)) { if (CollectionUtils.isEmpty(jobInfos)) {
return; return;
@ -211,7 +211,7 @@ public class OmsScheduleService {
long nowTime = System.currentTimeMillis(); long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE; long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> { Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> {
List<WorkflowInfoDO> wfInfos = workflowInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold); List<WorkflowInfoDO> wfInfos = workflowInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold);
if (CollectionUtils.isEmpty(wfInfos)) { if (CollectionUtils.isEmpty(wfInfos)) {
return; return;
@ -226,7 +226,7 @@ public class OmsScheduleService {
Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> { Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> {
try { try {
// 查询所有的秒级任务只包含ID // 查询所有的秒级任务只包含ID
List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.frequentTypes); List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.frequentTypes);
// 查询日志记录表中是否存在相关的任务 // 查询日志记录表中是否存在相关的任务
List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus); List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus);
Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList); Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList);

View File

@ -58,7 +58,7 @@ public class WorkflowInstanceManager {
* 提交运行 Workflow 工作流 * 提交运行 Workflow 工作流
* @param wfInfo workflow 工作流数据库对象 * @param wfInfo workflow 工作流数据库对象
*/ */
public void submit(WorkflowInfoDO wfInfo) { public Long submit(WorkflowInfoDO wfInfo) {
Long wfInstanceId = idGenerateService.allocate(); Long wfInstanceId = idGenerateService.allocate();
@ -91,6 +91,7 @@ public class WorkflowInstanceManager {
log.error("[WorkflowInstanceManager] submit workflow: {} failed.", wfInfo, e); log.error("[WorkflowInstanceManager] submit workflow: {} failed.", wfInfo, e);
} }
workflowInstanceInfoRepository.saveAndFlush(newWfInstance); workflowInstanceInfoRepository.saveAndFlush(newWfInstance);
return wfInstanceId;
} }
/** /**
@ -165,7 +166,12 @@ public class WorkflowInstanceManager {
jobId2Node.get(jobId).setInstanceId(newInstanceId); jobId2Node.get(jobId).setInstanceId(newInstanceId);
}); });
wfInstance.setDag(JsonUtils.toJSONStringUnsafe(dag)); if (allFinished.get()) {
wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
// 最终任务的结果作为整个 workflow 的结果
wfInstance.setResult(result);
}
wfInstance.setDag(JsonUtils.toJSONString(dag));
workflowInstanceInfoRepository.saveAndFlush(wfInstance); workflowInstanceInfoRepository.saveAndFlush(wfInstance);
}catch (Exception e) { }catch (Exception e) {

View File

@ -5,6 +5,8 @@ import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.oms.common.utils.JsonUtils; import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils; import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.common.SJ;
import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository;
@ -25,6 +27,8 @@ public class WorkflowService {
@Resource @Resource
private WorkflowInfoRepository workflowInfoRepository; private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
/** /**
* 保存/修改DAG工作流 * 保存/修改DAG工作流
@ -50,9 +54,11 @@ public class WorkflowService {
BeanUtils.copyProperties(req, wf); BeanUtils.copyProperties(req, wf);
wf.setGmtModified(new Date()); wf.setGmtModified(new Date());
wf.setPeDAG(JsonUtils.toJSONString(req.getPEWorkflowDAG())); wf.setPeDAG(JsonUtils.toJSONString(req.getPEWorkflowDAG()));
wf.setStatus(SwitchableStatus.valueOf(req.getStatus()).getV());
wf.setNotifyUserIds(SJ.commaJoiner.join(req.getNotifyUserIds()));
// 计算 NextTriggerTime // 计算 NextTriggerTime
TimeExpressionType timeExpressionType = TimeExpressionType.of(req.getTimeExpressionType()); TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
if (timeExpressionType == TimeExpressionType.CRON) { if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(req.getTimeExpression()); CronExpression cronExpression = new CronExpression(req.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date()); Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date());
@ -63,4 +69,46 @@ public class WorkflowService {
return newEntity.getId(); return newEntity.getId();
} }
/**
* 删除工作流软删除
* @param wfId 工作流ID
* @param appId 所属应用ID
*/
public void deleteWorkflow(Long wfId, Long appId) {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
wfInfo.setStatus(SwitchableStatus.DELETED.getV());
wfInfo.setGmtModified(new Date());
workflowInfoRepository.saveAndFlush(wfInfo);
}
/**
* 禁用工作流
* @param wfId 工作流ID
* @param appId 所属应用ID
*/
public void disableWorkflow(Long wfId, Long appId) {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
wfInfo.setStatus(SwitchableStatus.DISABLE.getV());
wfInfo.setGmtModified(new Date());
workflowInfoRepository.saveAndFlush(wfInfo);
}
/**
* 立即运行工作流
* @param wfId 工作流ID
* @param appId 所属应用ID
* @return workflow 实例的 instanceIdwfInstanceId
*/
public Long runWorkflow(Long wfId, Long appId) {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
return workflowInstanceManager.submit(wfInfo);
}
private WorkflowInfoDO permissionCheck(Long wfId, Long appId) {
WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId));
if (!wfInfo.getAppId().equals(appId)) {
throw new OmsException("Permission Denied!can't delete other appId's workflow!");
}
return wfInfo;
}
} }

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.common.ExecuteType; import com.github.kfcfans.oms.common.ExecuteType;
import com.github.kfcfans.oms.common.ProcessorType; import com.github.kfcfans.oms.common.ProcessorType;
import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.persistence.PageResult; import com.github.kfcfans.oms.server.persistence.PageResult;
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.common.response.ResultDTO;
@ -77,7 +77,7 @@ public class JobController {
// 无查询条件查询全部 // 无查询条件查询全部
if (request.getJobId() == null && StringUtils.isEmpty(request.getKeyword())) { if (request.getJobId() == null && StringUtils.isEmpty(request.getKeyword())) {
jobInfoPage = jobInfoRepository.findByAppIdAndStatusNot(request.getAppId(), pageRequest, JobStatus.DELETED.getV()); jobInfoPage = jobInfoRepository.findByAppIdAndStatusNot(request.getAppId(), SwitchableStatus.DELETED.getV(), pageRequest);
return ResultDTO.success(convertPage(jobInfoPage)); return ResultDTO.success(convertPage(jobInfoPage));
} }
@ -104,7 +104,7 @@ public class JobController {
// 模糊查询 // 模糊查询
String condition = "%" + request.getKeyword() + "%"; String condition = "%" + request.getKeyword() + "%";
jobInfoPage = jobInfoRepository.findByAppIdAndJobNameLikeAndStatusNot(request.getAppId(), condition, JobStatus.DELETED.getV(), pageRequest); jobInfoPage = jobInfoRepository.findByAppIdAndJobNameLikeAndStatusNot(request.getAppId(), condition, SwitchableStatus.DELETED.getV(), pageRequest);
return ResultDTO.success(convertPage(jobInfoPage)); return ResultDTO.success(convertPage(jobInfoPage));
} }
@ -130,7 +130,7 @@ public class JobController {
jobInfoVO.setTimeExpressionType(timeExpressionType.name()); jobInfoVO.setTimeExpressionType(timeExpressionType.name());
jobInfoVO.setExecuteType(executeType.name()); jobInfoVO.setExecuteType(executeType.name());
jobInfoVO.setProcessorType(processorType.name()); jobInfoVO.setProcessorType(processorType.name());
jobInfoVO.setEnable(jobInfoDO.getStatus() == JobStatus.ENABLE.getV()); jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV());
if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) { if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) {
jobInfoVO.setNotifyUserIds(commaSplitter.splitToList(jobInfoDO.getNotifyUserIds())); jobInfoVO.setNotifyUserIds(commaSplitter.splitToList(jobInfoDO.getNotifyUserIds()));

View File

@ -2,13 +2,21 @@ package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.persistence.PageResult;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.oms.server.service.workflow.WorkflowService; import com.github.kfcfans.oms.server.service.workflow.WorkflowService;
import org.springframework.web.bind.annotation.PostMapping; import com.github.kfcfans.oms.server.web.request.QueryWorkflowInfoRequest;
import org.springframework.web.bind.annotation.RequestBody; import com.github.kfcfans.oms.server.web.response.WorkflowInfoVO;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.data.domain.Page;
import org.springframework.web.bind.annotation.RestController; import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.stream.Collectors;
/** /**
* 工作流控制器 * 工作流控制器
@ -22,10 +30,57 @@ public class WorkflowController {
@Resource @Resource
private WorkflowService workflowService; private WorkflowService workflowService;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@PostMapping("/save") @PostMapping("/save")
public ResultDTO<Long> save(@RequestBody SaveWorkflowRequest req) throws Exception { public ResultDTO<Long> save(@RequestBody SaveWorkflowRequest req) throws Exception {
return ResultDTO.success(workflowService.saveWorkflow(req)); return ResultDTO.success(workflowService.saveWorkflow(req));
} }
@GetMapping("/disable")
public ResultDTO<Void> disableWorkflow(Long workflowId, Long appId) {
workflowService.disableWorkflow(workflowId, appId);
return ResultDTO.success(null);
}
@GetMapping("/delete")
public ResultDTO<Void> deleteWorkflow(Long workflowId, Long appId) {
workflowService.deleteWorkflow(workflowId, appId);
return ResultDTO.success(null);
}
@PostMapping("/list")
public ResultDTO<PageResult<WorkflowInfoVO>> list(@RequestBody QueryWorkflowInfoRequest req) {
Sort sort = Sort.by(Sort.Direction.DESC, "gmtCreate");
PageRequest pageRequest = PageRequest.of(req.getIndex(), req.getPageSize(), sort);
Page<WorkflowInfoDO> wfPage;
// 排除已删除数据
int nStatus = SwitchableStatus.DELETED.getV();
// 无查询条件查询全部
if (req.getWorkflowId() == null && StringUtils.isEmpty(req.getKeyword())) {
wfPage = workflowInfoRepository.findByAppIdAndStatusNot(req.getAppId(), nStatus, pageRequest);
}else if (req.getWorkflowId() != null) {
wfPage = workflowInfoRepository.findByIdAndStatusNot(req.getWorkflowId(), nStatus, pageRequest);
}else {
String condition = "%" + req.getKeyword() + "%";
wfPage = workflowInfoRepository.findByAppIdInAndStatusNotAndWfNameLike(req.getAppId(), nStatus, condition, pageRequest);
}
return ResultDTO.success(convertPage(wfPage));
}
@GetMapping("/run")
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId));
}
private static PageResult<WorkflowInfoVO> convertPage(Page<WorkflowInfoDO> originPage) {
PageResult<WorkflowInfoVO> newPage = new PageResult<>(originPage);
newPage.setData(originPage.getContent().stream().map(WorkflowInfoVO::from).collect(Collectors.toList()));
return newPage;
}
} }

View File

@ -0,0 +1,25 @@
package com.github.kfcfans.oms.server.web.request;
import lombok.Data;
/**
* 查询工作流
*
* @author tjq
* @since 2020/5/27
*/
@Data
public class QueryWorkflowInfoRequest {
// 任务所属应用ID
private Long appId;
// 当前页码
private Integer index;
// 页大小
private Integer pageSize;
// 查询条件
private Long workflowId;
private String keyword;
}

View File

@ -0,0 +1,55 @@
package com.github.kfcfans.oms.server.web.response;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.server.common.SJ;
import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import lombok.Data;
import org.springframework.beans.BeanUtils;
import java.util.List;
import java.util.stream.Collectors;
/**
* 工作流对外展示对象
*
* @author tjq
* @since 2020/5/27
*/
@Data
public class WorkflowInfoVO {
private Long id;
private String wfName;
private String wfDescription;
// 点线表示法
private PEWorkflowDAG pEWorkflowDAG;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private String timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
private String timeExpression;
// ENABLE / DISABLE
private String status;
// 工作流整体失败的报警
private List<Long> notifyUserIds;
public static WorkflowInfoVO from(WorkflowInfoDO wfDO) {
WorkflowInfoVO vo = new WorkflowInfoVO();
BeanUtils.copyProperties(wfDO, vo);
vo.setStatus(SwitchableStatus.of(wfDO.getStatus()).name());
vo.setTimeExpressionType(TimeExpressionType.of(wfDO.getTimeExpressionType()).name());
vo.setPEWorkflowDAG(JsonUtils.parseObjectUnsafe(wfDO.getPeDAG(), PEWorkflowDAG.class));
vo.setNotifyUserIds(SJ.commaSplitter.splitToList(wfDO.getNotifyUserIds()).stream().map(Long::valueOf).collect(Collectors.toList()));
return vo;
}
}

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.oms.server.test;
import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.utils.NetUtils; import com.github.kfcfans.oms.common.utils.NetUtils;
import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.OmsLockDO; import com.github.kfcfans.oms.server.persistence.core.model.OmsLockDO;
@ -54,7 +54,7 @@ public class RepositoryTest {
@Test @Test
public void testSelectCronJobSQL() { public void testSelectCronJobSQL() {
List<JobInfoDO> result = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(Lists.newArrayList(1L), JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), System.currentTimeMillis()); List<JobInfoDO> result = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(Lists.newArrayList(1L), SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), System.currentTimeMillis());
System.out.println(result); System.out.println(result);
} }