[dev] suit WorkflowController

This commit is contained in:
tjq 2020-05-30 13:45:27 +08:00
parent 6fd685cdfc
commit f350f8831c
11 changed files with 53 additions and 18 deletions

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.common.request.http;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import lombok.Data;
@ -27,7 +28,7 @@ public class SaveWorkflowRequest {
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private String timeExpressionType;
private TimeExpressionType timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
private String timeExpression;
@ -35,7 +36,7 @@ public class SaveWorkflowRequest {
private Integer maxWfInstanceNum;
// ENABLE / DISABLE
private String status;
private boolean enable;
// 工作流整体失败的报警
private List<Long> notifyUserIds;

View File

@ -0,0 +1,21 @@
package com.github.kfcfans.oms.server.common.constans;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 任务实例类型
*
* @author tjq
* @since 2020/5/29
*/
@Getter
@AllArgsConstructor
public enum InstanceType {
NORMAL(1),
WORKFLOW(2);
private final int v;
}

View File

@ -33,6 +33,9 @@ public class InstanceInfoDO {
private Long instanceId;
// 任务实例参数
private String instanceParams;
// 该任务实例的类型普通/工作流InstanceType
private Integer type;
// 该任务实例所属的 workflow ID workflow 任务存在
private Long wfInstanceId;
/**

View File

@ -62,10 +62,10 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
InstanceInfoDO findByInstanceId(long instanceId);
Page<InstanceInfoDO> findByAppId(long appId, Pageable pageable);
Page<InstanceInfoDO> findByJobId(long jobId, Pageable pageable);
Page<InstanceInfoDO> findByAppIdAndType(long appId, int type, Pageable pageable);
Page<InstanceInfoDO> findByJobIdAndType(long jobId, int type, Pageable pageable);
// 只会有一条数据只是为了统一
Page<InstanceInfoDO> findByInstanceId(long instanceId, Pageable pageable);
Page<InstanceInfoDO> findByInstanceIdAndType(long instanceId, int type, Pageable pageable);
// 数据统计
long countByAppIdAndStatus(long appId, int status);

View File

@ -11,6 +11,7 @@ import com.github.kfcfans.oms.common.request.ServerStopInstanceReq;
import com.github.kfcfans.oms.common.response.AskResponse;
import com.github.kfcfans.oms.common.response.InstanceInfoDTO;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.common.constans.InstanceType;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
@ -61,6 +62,7 @@ public class InstanceService {
newInstanceInfo.setAppId(appId);
newInstanceInfo.setInstanceId(instanceId);
newInstanceInfo.setInstanceParams(instanceParams);
newInstanceInfo.setType(wfInstanceId == null ? InstanceType.NORMAL.getV() : InstanceType.WORKFLOW.getV());
newInstanceInfo.setWfInstanceId(wfInstanceId);
newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());

View File

@ -54,14 +54,13 @@ public class WorkflowService {
BeanUtils.copyProperties(req, wf);
wf.setGmtModified(new Date());
wf.setPeDAG(JsonUtils.toJSONString(req.getPEWorkflowDAG()));
wf.setStatus(SwitchableStatus.valueOf(req.getStatus()).getV());
wf.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV());
wf.setStatus(req.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
wf.setTimeExpressionType(req.getTimeExpressionType().getV());
wf.setNotifyUserIds(SJ.commaJoiner.join(req.getNotifyUserIds()));
// 计算 NextTriggerTime
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
if (timeExpressionType == TimeExpressionType.CRON) {
if (req.getTimeExpressionType() == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(req.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date());
wf.setNextTriggerTime(nextValidTime.getTime());

View File

@ -113,16 +113,16 @@ public class InstanceController {
// 查询全部数据
if (request.getJobId() == null && request.getInstanceId() == null) {
return ResultDTO.success(convertPage(instanceInfoRepository.findByAppId(request.getAppId(), pageable)));
return ResultDTO.success(convertPage(instanceInfoRepository.findByAppIdAndType(request.getAppId(), request.getType().getV(), pageable)));
}
// 根据JobId查询
if (request.getJobId() != null) {
return ResultDTO.success(convertPage(instanceInfoRepository.findByJobId(request.getJobId(), pageable)));
return ResultDTO.success(convertPage(instanceInfoRepository.findByJobIdAndType(request.getJobId(), request.getType().getV(), pageable)));
}
// 根据InstanceId查询
return ResultDTO.success(convertPage(instanceInfoRepository.findByInstanceId(request.getInstanceId(), pageable)));
return ResultDTO.success(convertPage(instanceInfoRepository.findByInstanceIdAndType(request.getInstanceId(), request.getType().getV(), pageable)));
}
private PageResult<InstanceLogVO> convertPage(Page<InstanceInfoDO> page) {

View File

@ -72,7 +72,7 @@ public class WorkflowController {
}
@GetMapping("/run")
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) throws Exception {
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) {
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId));
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.web.request;
import com.github.kfcfans.oms.server.common.constans.InstanceType;
import lombok.Data;
/**
@ -18,7 +19,8 @@ public class QueryInstanceRequest {
// 页大小
private Integer pageSize;
// 查询条件
// 查询条件NORMAL/WORKFLOW
private InstanceType type;
private Long instanceId;
private Long jobId;
}

View File

@ -17,6 +17,8 @@ public class InstanceLogVO {
private String jobName;
// 任务实例IDJS精度丢失
private String instanceId;
// 该任务实例所属的 workflow ID workflow 任务存在
private Long wfInstanceId;
// 执行结果
private String result;

View File

@ -8,6 +8,7 @@ import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import lombok.Data;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.stream.Collectors;
@ -26,6 +27,9 @@ public class WorkflowInfoVO {
private String wfName;
private String wfDescription;
// 所属应用ID
private Long appId;
// 点线表示法
private PEWorkflowDAG pEWorkflowDAG;
@ -39,7 +43,7 @@ public class WorkflowInfoVO {
private Integer maxWfInstanceNum;
// ENABLE / DISABLE
private String status;
private boolean enable;
// 工作流整体失败的报警
private List<Long> notifyUserIds;
@ -48,11 +52,12 @@ public class WorkflowInfoVO {
WorkflowInfoVO vo = new WorkflowInfoVO();
BeanUtils.copyProperties(wfDO, vo);
vo.setStatus(SwitchableStatus.of(wfDO.getStatus()).name());
vo.enable = SwitchableStatus.of(wfDO.getStatus()) == SwitchableStatus.ENABLE;
vo.setTimeExpressionType(TimeExpressionType.of(wfDO.getTimeExpressionType()).name());
vo.setPEWorkflowDAG(JsonUtils.parseObjectUnsafe(wfDO.getPeDAG(), PEWorkflowDAG.class));
vo.setNotifyUserIds(SJ.commaSplitter.splitToList(wfDO.getNotifyUserIds()).stream().map(Long::valueOf).collect(Collectors.toList()));
if (!StringUtils.isEmpty(wfDO.getNotifyUserIds())) {
vo.setNotifyUserIds(SJ.commaSplitter.splitToList(wfDO.getNotifyUserIds()).stream().map(Long::valueOf).collect(Collectors.toList()));
}
return vo;
}
}