[dev] WorkflowInstanceController

This commit is contained in:
tjq 2020-05-31 13:54:06 +08:00
parent f350f8831c
commit aa805062b1
19 changed files with 325 additions and 41 deletions

View File

@ -0,0 +1,15 @@
package com.github.kfcfans.oms.common;
/**
* 公共常量
*
* @author tjq
* @since 2020/5/31
*/
public class OmsConstant {
public static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
public static final String TIME_PATTERN_PLUS = "yyyy-MM-dd HH:mm:ss.SSS";
public static final String NONE = "N/A";
}

View File

@ -14,7 +14,6 @@ import java.util.List;
*/ */
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum WorkflowInstanceStatus { public enum WorkflowInstanceStatus {
WAITING(1, "等待调度"), WAITING(1, "等待调度"),

View File

@ -29,6 +29,11 @@ public class PEWorkflowDAG {
public static class Node { public static class Node {
private Long jobId; private Long jobId;
private String jobName; private String jobName;
// 仅向前端输出时需要
private Long instanceId;
private boolean finished;
private String result;
} }
// jobId -> jobId // jobId -> jobId

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.kfcfans.oms.common.OmsException;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
/** /**
@ -52,7 +53,6 @@ public class JsonUtils {
}catch (Exception e) { }catch (Exception e) {
ExceptionUtils.rethrow(e); ExceptionUtils.rethrow(e);
} }
// impossible throw new OmsException("impossible");
return null;
} }
} }

View File

@ -5,9 +5,12 @@ import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG; import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
/** /**
@ -64,6 +67,36 @@ public class WorkflowDAGUtils {
return new WorkflowDAG(id2Node.get(rootIds.iterator().next())); return new WorkflowDAG(id2Node.get(rootIds.iterator().next()));
} }
/**
* 将引用式DAG图转化为点线式DAG图
* @param dag 引用式DAG图
* @return 点线式DAG图
*/
public static PEWorkflowDAG convert2PE(WorkflowDAG dag) {
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
Queue<WorkflowDAG.Node> queue = Queues.newLinkedBlockingQueue();
queue.add(dag.getRoot());
while (!queue.isEmpty()) {
WorkflowDAG.Node node = queue.poll();
queue.addAll(node.getSuccessors());
// 添加点
PEWorkflowDAG.Node peNode = new PEWorkflowDAG.Node(node.getJobId(), node.getJobName(), node.getInstanceId(), node.isFinished(), node.getResult());
nodes.add(peNode);
// 添加线
node.getSuccessors().forEach(successor -> {
PEWorkflowDAG.Edge edge = new PEWorkflowDAG.Edge(node.getJobId(), successor.getJobId());
edges.add(edge);
});
}
return new PEWorkflowDAG(nodes, edges);
}
/** /**
* 校验 DAG 是否有效 * 校验 DAG 是否有效
* @param peWorkflowDAG 点线表示法的 DAG * @param peWorkflowDAG 点线表示法的 DAG

View File

@ -41,6 +41,11 @@ public class WorkflowInstanceInfoDO {
@Column(columnDefinition="TEXT") @Column(columnDefinition="TEXT")
private String result; private String result;
// 实际触发时间
private Long actualTriggerTime;
// 结束时间
private Long finishedTime;
private Date gmtCreate; private Date gmtCreate;
private Date gmtModified; private Date gmtModified;
} }

View File

@ -62,6 +62,7 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
InstanceInfoDO findByInstanceId(long instanceId); InstanceInfoDO findByInstanceId(long instanceId);
// list 三兄弟
Page<InstanceInfoDO> findByAppIdAndType(long appId, int type, Pageable pageable); Page<InstanceInfoDO> findByAppIdAndType(long appId, int type, Pageable pageable);
Page<InstanceInfoDO> findByJobIdAndType(long jobId, int type, Pageable pageable); Page<InstanceInfoDO> findByJobIdAndType(long jobId, int type, Pageable pageable);
// 只会有一条数据只是为了统一 // 只会有一条数据只是为了统一

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.oms.server.persistence.core.repository; package com.github.kfcfans.oms.server.persistence.core.repository;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
@ -28,4 +30,9 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository<WorkflowIn
int deleteAllByGmtModifiedBefore(Date time); int deleteAllByGmtModifiedBefore(Date time);
int countByWorkflowIdAndStatusIn(Long workflowId, List<Integer> status); int countByWorkflowIdAndStatusIn(Long workflowId, List<Integer> status);
// list 三兄弟
Page<WorkflowInstanceInfoDO> findByAppId(Long appId, Pageable pageable);
Page<WorkflowInstanceInfoDO> findByAppIdAndWfInstanceId(Long appId, Long wfInstanceId, Pageable pageable);
Page<WorkflowInstanceInfoDO> findByAppIdAndWorkflowId(Long appId, Long workflowId, Pageable pageable);
} }

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.service; package com.github.kfcfans.oms.server.service;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import com.github.kfcfans.oms.common.OmsConstant;
import com.github.kfcfans.oms.common.model.DeployedContainerInfo; import com.github.kfcfans.oms.common.model.DeployedContainerInfo;
import com.github.kfcfans.oms.common.model.GitRepoInfo; import com.github.kfcfans.oms.common.model.GitRepoInfo;
import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest; import com.github.kfcfans.oms.common.request.ServerDeployContainerRequest;
@ -69,8 +70,6 @@ public class ContainerService {
private static final int DEPLOY_BATCH_NUM = 50; private static final int DEPLOY_BATCH_NUM = 50;
// 部署间隔 // 部署间隔
private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000; private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000;
// 时间格式
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
/** /**
* 保存容器 * 保存容器
@ -213,7 +212,7 @@ public class ContainerService {
Date lastDeployTime = container.getLastDeployTime(); Date lastDeployTime = container.getLastDeployTime();
if (lastDeployTime != null) { if (lastDeployTime != null) {
if ((System.currentTimeMillis() - lastDeployTime.getTime()) < DEPLOY_MIN_INTERVAL) { if ((System.currentTimeMillis() - lastDeployTime.getTime()) < DEPLOY_MIN_INTERVAL) {
remote.sendText("SYSTEM: [warn] deploy too frequent, last deploy time is: " + DateFormatUtils.format(lastDeployTime, TIME_PATTERN)); remote.sendText("SYSTEM: [warn] deploy too frequent, last deploy time is: " + DateFormatUtils.format(lastDeployTime, OmsConstant.TIME_PATTERN));
} }
} }

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.service; package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.oms.common.OmsConstant;
import com.github.kfcfans.oms.common.TimeExpressionType; import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.model.InstanceLogContent; import com.github.kfcfans.oms.common.model.InstanceLogContent;
import com.github.kfcfans.oms.common.utils.CommonUtils; import com.github.kfcfans.oms.common.utils.CommonUtils;
@ -56,7 +57,7 @@ public class InstanceLogService {
private final ExecutorService workerPool; private final ExecutorService workerPool;
// 格式化时间戳 // 格式化时间戳
private static final FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS"); private static final FastDateFormat dateFormat = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS);
// 每一个展示的行数 // 每一个展示的行数
private static final int MAX_LINE_COUNT = 100; private static final int MAX_LINE_COUNT = 100;
// 过期时间 // 过期时间
@ -118,7 +119,7 @@ public class InstanceLogService {
++lines; ++lines;
} }
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] read logFile from disk failed.", e); log.warn("[InstanceLog-{}] read logFile from disk failed.", instanceId, e);
return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e)); return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
} }
@ -128,7 +129,7 @@ public class InstanceLogService {
}catch (TimeoutException te) { }catch (TimeoutException te) {
return StringPage.simple("log file is being prepared, please try again later."); return StringPage.simple("log file is being prepared, please try again later.");
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] fetchInstanceLog failed for instance(instanceId={}).", instanceId, e); log.warn("[InstanceLog-{}] fetch instance log failed.", instanceId, e);
return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e)); return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
} }
} }
@ -180,20 +181,20 @@ public class InstanceLogService {
if (gridFsManager.available()) { if (gridFsManager.available()) {
try { try {
gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId)); gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId));
log.info("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB succeed, using: {}.", instanceId, sw.stop()); log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] push local instanceLogs(instanceId={}) to mongoDB failed.", instanceId, e); log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
} }
} }
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] sync local instanceLogs(instanceId={}) failed.", instanceId, e); log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);
} }
// 删除本地数据库数据 // 删除本地数据库数据
try { try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
instanceId2LastReportTime.remove(instanceId); instanceId2LastReportTime.remove(instanceId);
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] delete local instanceLog(instanceId={}) failed.", instanceId, e); log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
} }
} }

View File

@ -58,6 +58,7 @@ public class WorkflowInstanceManager {
newWfInstance.setAppId(wfInfo.getAppId()); newWfInstance.setAppId(wfInfo.getAppId());
newWfInstance.setWfInstanceId(wfInstanceId); newWfInstance.setWfInstanceId(wfInstanceId);
newWfInstance.setWorkflowId(wfInfo.getId()); newWfInstance.setWorkflowId(wfInfo.getId());
newWfInstance.setActualTriggerTime(System.currentTimeMillis());
newWfInstance.setGmtCreate(now); newWfInstance.setGmtCreate(now);
newWfInstance.setGmtModified(now); newWfInstance.setGmtModified(now);
@ -73,6 +74,7 @@ public class WorkflowInstanceManager {
newWfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); newWfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
newWfInstance.setResult(e.getMessage()); newWfInstance.setResult(e.getMessage());
newWfInstance.setFinishedTime(System.currentTimeMillis());
} }
workflowInstanceInfoRepository.save(newWfInstance); workflowInstanceInfoRepository.save(newWfInstance);
return wfInstanceId; return wfInstanceId;
@ -103,6 +105,7 @@ public class WorkflowInstanceManager {
if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) { if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV());
wfInstanceInfo.setResult(String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, instanceConcurrency, wfInfo.getMaxWfInstanceNum())); wfInstanceInfo.setResult(String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, instanceConcurrency, wfInfo.getMaxWfInstanceNum()));
wfInstanceInfo.setFinishedTime(System.currentTimeMillis());
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo); workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
return; return;
@ -130,6 +133,7 @@ public class WorkflowInstanceManager {
wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstanceInfo.setStatus(WorkflowInstanceStatus.FAILED.getV());
wfInstanceInfo.setResult(e.getMessage()); wfInstanceInfo.setResult(e.getMessage());
wfInstanceInfo.setFinishedTime(System.currentTimeMillis());
log.error("[Workflow-{}] submit workflow: {} failed.", wfInfo.getId(), wfInfo, e); log.error("[Workflow-{}] submit workflow: {} failed.", wfInfo.getId(), wfInfo, e);
@ -185,6 +189,7 @@ public class WorkflowInstanceManager {
wfInstance.setDag(JsonUtils.toJSONStringUnsafe(dag)); wfInstance.setDag(JsonUtils.toJSONStringUnsafe(dag));
wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_FAILED); wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_FAILED);
wfInstance.setFinishedTime(System.currentTimeMillis());
workflowInstanceInfoRepository.saveAndFlush(wfInstance); workflowInstanceInfoRepository.saveAndFlush(wfInstance);
log.warn("[Workflow-{}] workflow(wfInstanceId={}) process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); log.warn("[Workflow-{}] workflow(wfInstanceId={}) process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
@ -229,6 +234,7 @@ public class WorkflowInstanceManager {
wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
// 最终任务的结果作为整个 workflow 的结果 // 最终任务的结果作为整个 workflow 的结果
wfInstance.setResult(result); wfInstance.setResult(result);
wfInstance.setFinishedTime(System.currentTimeMillis());
log.info("[Workflow-{}] workflowInstance(wfInstanceId={}) process successfully.", wfId, wfInstanceId); log.info("[Workflow-{}] workflowInstance(wfInstanceId={}) process successfully.", wfId, wfInstanceId);
} }
@ -241,6 +247,7 @@ public class WorkflowInstanceManager {
}catch (Exception e) { }catch (Exception e) {
wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV()); wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
wfInstance.setResult("MOVE NEXT STEP FAILED: " + e.getMessage()); wfInstance.setResult("MOVE NEXT STEP FAILED: " + e.getMessage());
wfInstance.setFinishedTime(System.currentTimeMillis());
workflowInstanceInfoRepository.saveAndFlush(wfInstance); workflowInstanceInfoRepository.saveAndFlush(wfInstance);
log.error("[Workflow-{}] update failed for workflowInstance({}).", wfId, wfInstanceId, e); log.error("[Workflow-{}] update failed for workflowInstance({}).", wfId, wfInstanceId, e);

View File

@ -0,0 +1,73 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Objects;
import java.util.Queue;
/**
* 工作流实例服务
*
* @author tjq
* @since 2020/5/31
*/
@Slf4j
@Service
public class WorkflowInstanceService {
@Resource
private InstanceService instanceService;
@Resource
private WorkflowInstanceInfoRepository wfInstanceInfoRepository;
/**
* 停止工作流实例
* @param wfInstanceId 工作流实例ID
* @param appId 所属应用ID
*/
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId));
if (!Objects.equals(appId, wfInstance.getAppId())) {
throw new OmsException("Permission Denied!");
}
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
throw new OmsException("already stopped");
}
// 修改数据库状态
wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER);
wfInstance.setGmtModified(new Date());
wfInstanceInfoRepository.saveAndFlush(wfInstance);
// 停止所有已启动且未完成的服务
WorkflowDAG workflowDAG = JsonUtils.parseObjectUnsafe(wfInstance.getDag(), WorkflowDAG.class);
Queue<WorkflowDAG.Node> queue = Queues.newLinkedBlockingQueue();
queue.add(workflowDAG.getRoot());
while (!queue.isEmpty()) {
WorkflowDAG.Node node = queue.poll();
if (node.getInstanceId() != null && !node.isFinished()) {
log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId());
instanceService.stopInstance(node.getInstanceId());
}
queue.addAll(node.getSuccessors());
}
log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId);
}
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.web.controller; package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.common.OmsConstant;
import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.common.constans.ContainerSourceType; import com.github.kfcfans.oms.server.common.constans.ContainerSourceType;
@ -119,7 +120,7 @@ public class ContainerController {
if (containerInfoDO.getLastDeployTime() == null) { if (containerInfoDO.getLastDeployTime() == null) {
vo.setLastDeployTime("N/A"); vo.setLastDeployTime("N/A");
}else { }else {
vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), "yyyy-MM-dd HH:mm:ss")); vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), OmsConstant.TIME_PATTERN));
} }
ContainerStatus status = ContainerStatus.of(containerInfoDO.getStatus()); ContainerStatus status = ContainerStatus.of(containerInfoDO.getStatus());
vo.setStatus(status.name()); vo.setStatus(status.name());

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.web.controller; package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.common.InstanceStatus; import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.OmsConstant;
import com.github.kfcfans.oms.common.response.ResultDTO; import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.common.model.InstanceDetail; import com.github.kfcfans.oms.common.model.InstanceDetail;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.OhMyServer;
@ -15,7 +16,7 @@ import com.github.kfcfans.oms.server.service.CacheService;
import com.github.kfcfans.oms.server.service.InstanceLogService; import com.github.kfcfans.oms.server.service.InstanceLogService;
import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest; import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest;
import com.github.kfcfans.oms.server.web.response.InstanceLogVO; import com.github.kfcfans.oms.server.web.response.InstanceInfoVO;
import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -57,8 +58,6 @@ public class InstanceController {
@Resource @Resource
private InstanceInfoRepository instanceInfoRepository; private InstanceInfoRepository instanceInfoRepository;
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
@GetMapping("/stop") @GetMapping("/stop")
public ResultDTO<Void> stopInstance(Long instanceId) { public ResultDTO<Void> stopInstance(Long instanceId) {
instanceService.stopInstance(instanceId); instanceService.stopInstance(instanceId);
@ -106,7 +105,7 @@ public class InstanceController {
} }
@PostMapping("/list") @PostMapping("/list")
public ResultDTO<PageResult<InstanceLogVO>> list(@RequestBody QueryInstanceRequest request) { public ResultDTO<PageResult<InstanceInfoVO>> list(@RequestBody QueryInstanceRequest request) {
Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified"); Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified");
PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort); PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort);
@ -125,36 +124,36 @@ public class InstanceController {
return ResultDTO.success(convertPage(instanceInfoRepository.findByInstanceIdAndType(request.getInstanceId(), request.getType().getV(), pageable))); return ResultDTO.success(convertPage(instanceInfoRepository.findByInstanceIdAndType(request.getInstanceId(), request.getType().getV(), pageable)));
} }
private PageResult<InstanceLogVO> convertPage(Page<InstanceInfoDO> page) { private PageResult<InstanceInfoVO> convertPage(Page<InstanceInfoDO> page) {
List<InstanceLogVO> content = page.getContent().stream().map(instanceLogDO -> { List<InstanceInfoVO> content = page.getContent().stream().map(instanceLogDO -> {
InstanceLogVO instanceLogVO = new InstanceLogVO(); InstanceInfoVO instanceInfoVO = new InstanceInfoVO();
BeanUtils.copyProperties(instanceLogDO, instanceLogVO); BeanUtils.copyProperties(instanceLogDO, instanceInfoVO);
// 状态转化为中文 // 状态转化为中文
instanceLogVO.setStatusStr(InstanceStatus.of(instanceLogDO.getStatus()).getDes()); instanceInfoVO.setStatusStr(InstanceStatus.of(instanceLogDO.getStatus()).getDes());
// 额外设置任务名称提高可读性 // 额外设置任务名称提高可读性
instanceLogVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId())); instanceInfoVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId()));
// ID 转化为 StringJS精度丢失 // ID 转化为 StringJS精度丢失
instanceLogVO.setJobId(instanceLogDO.getJobId().toString()); instanceInfoVO.setJobId(instanceLogDO.getJobId().toString());
instanceLogVO.setInstanceId(instanceLogDO.getInstanceId().toString()); instanceInfoVO.setInstanceId(instanceLogDO.getInstanceId().toString());
// 格式化时间 // 格式化时间
if (instanceLogDO.getActualTriggerTime() == null) { if (instanceLogDO.getActualTriggerTime() == null) {
instanceLogVO.setActualTriggerTime("N/A"); instanceInfoVO.setActualTriggerTime(OmsConstant.NONE);
}else { }else {
instanceLogVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), TIME_PATTERN)); instanceInfoVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN));
} }
if (instanceLogDO.getFinishedTime() == null) { if (instanceLogDO.getFinishedTime() == null) {
instanceLogVO.setFinishedTime("N/A"); instanceInfoVO.setFinishedTime(OmsConstant.NONE);
}else { }else {
instanceLogVO.setFinishedTime(DateFormatUtils.format(instanceLogDO.getFinishedTime(), TIME_PATTERN)); instanceInfoVO.setFinishedTime(DateFormatUtils.format(instanceLogDO.getFinishedTime(), OmsConstant.TIME_PATTERN));
} }
return instanceLogVO; return instanceInfoVO;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
PageResult<InstanceLogVO> pageResult = new PageResult<>(page); PageResult<InstanceInfoVO> pageResult = new PageResult<>(page);
pageResult.setData(content); pageResult.setData(content);
return pageResult; return pageResult;
} }

View File

@ -0,0 +1,60 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.PageResult;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.oms.server.service.workflow.WorkflowInstanceService;
import com.github.kfcfans.oms.server.web.request.QueryWorkflowInstanceRequest;
import com.github.kfcfans.oms.server.web.response.WorkflowInstanceInfoVO;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.stream.Collectors;
/**
* 工作流实例控制器
*
* @author tjq
* @since 2020/5/31
*/
@RestController
@RequestMapping("/wfInstance")
public class WorkflowInstanceController {
@Resource
private WorkflowInstanceService workflowInstanceService;
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@GetMapping("/stop")
public ResultDTO<Void> stopWfInstance(Long wfInstanceId, Long appId) {
workflowInstanceService.stopWorkflowInstance(wfInstanceId, appId);
return ResultDTO.success(null);
}
@PostMapping("/list")
public ResultDTO<PageResult<WorkflowInstanceInfoVO>> listWfInstance(@RequestBody QueryWorkflowInstanceRequest req) {
Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified");
PageRequest pageable = PageRequest.of(req.getIndex(), req.getPageSize(), sort);
Page<WorkflowInstanceInfoDO> ps;
if (req.getWfInstanceId() == null && req.getWorkflowId() == null) {
ps = workflowInstanceInfoRepository.findByAppId(req.getAppId(), pageable);
}else if (req.getWfInstanceId() != null) {
ps = workflowInstanceInfoRepository.findByAppIdAndWfInstanceId(req.getAppId(), req.getWfInstanceId(), pageable);
}else {
ps = workflowInstanceInfoRepository.findByAppIdAndWorkflowId(req.getAppId(), req.getWorkflowId(), pageable);
}
return ResultDTO.success(convertPage(ps));
}
private static PageResult<WorkflowInstanceInfoVO> convertPage(Page<WorkflowInstanceInfoDO> ps) {
PageResult<WorkflowInstanceInfoVO> pr = new PageResult<>(ps);
pr.setData(ps.getContent().stream().map(WorkflowInstanceInfoVO::from).collect(Collectors.toList()));
return pr;
}
}

View File

@ -0,0 +1,24 @@
package com.github.kfcfans.oms.server.web.request;
import lombok.Data;
/**
* 查询工作流实例请求
*
* @author tjq
* @since 2020/5/31
*/
@Data
public class QueryWorkflowInstanceRequest {
// 任务所属应用ID
private Long appId;
// 当前页码
private Integer index;
// 页大小
private Integer pageSize;
// 查询条件NORMAL/WORKFLOW
private Long wfInstanceId;
private Long workflowId;
}

View File

@ -3,13 +3,13 @@ package com.github.kfcfans.oms.server.web.response;
import lombok.Data; import lombok.Data;
/** /**
* ExecuteLog 对外展示对象 * InstanceInfo 对外展示对象
* *
* @author tjq * @author tjq
* @since 2020/4/12 * @since 2020/4/12
*/ */
@Data @Data
public class InstanceLogVO { public class InstanceInfoVO {
// 任务IDJS精度丢失 // 任务IDJS精度丢失
private String jobId; private String jobId;

View File

@ -0,0 +1,59 @@
package com.github.kfcfans.oms.server.web.response;
import com.github.kfcfans.oms.common.OmsConstant;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
import lombok.Data;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.BeanUtils;
/**
* 工作流实例视图层展示对象
*
* @author tjq
* @since 2020/5/31
*/
@Data
public class WorkflowInstanceInfoVO {
// 任务所属应用的ID冗余提高查询效率
private Long appId;
// workflowInstanceId任务实例表都使用单独的ID作为主键以支持潜在的分表需求
private Long wfInstanceId;
private Long workflowId;
// workflow 状态WorkflowInstanceStatus
private String status;
private PEWorkflowDAG pEWorkflowDAG;
private String result;
// 实际触发时间需要格式化为人看得懂的时间
private String actualTriggerTime;
// 结束时间同理需要格式化
private String finishedTime;
public static WorkflowInstanceInfoVO from(WorkflowInstanceInfoDO wfInstanceDO) {
WorkflowInstanceInfoVO vo = new WorkflowInstanceInfoVO();
BeanUtils.copyProperties(wfInstanceDO, vo);
vo.setStatus(WorkflowInstanceStatus.of(wfInstanceDO.getStatus()).getDes());
vo.setPEWorkflowDAG(WorkflowDAGUtils.convert2PE(JsonUtils.parseObjectUnsafe(wfInstanceDO.getDag(), WorkflowDAG.class)));
// 格式化时间
vo.setActualTriggerTime(DateFormatUtils.format(wfInstanceDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN));
if (wfInstanceDO.getFinishedTime() == null) {
vo.setFinishedTime(OmsConstant.NONE);
}else {
vo.setFinishedTime(DateFormatUtils.format(wfInstanceDO.getFinishedTime(), OmsConstant.TIME_PATTERN));
}
return vo;
}
}

View File

@ -1,10 +1,7 @@
package com.github.kfcfans.oms.worker.core.tracker.task; package com.github.kfcfans.oms.worker.core.tracker.task;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import com.github.kfcfans.oms.common.ExecuteType; import com.github.kfcfans.oms.common.*;
import com.github.kfcfans.oms.common.InstanceStatus;
import com.github.kfcfans.oms.common.RemoteConstant;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.model.InstanceDetail; import com.github.kfcfans.oms.common.model.InstanceDetail;
import com.github.kfcfans.oms.common.request.ServerScheduleJobReq; import com.github.kfcfans.oms.common.request.ServerScheduleJobReq;
import com.github.kfcfans.oms.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.oms.common.request.TaskTrackerReportInstanceStatusReq;
@ -64,7 +61,6 @@ public class FrequentTaskTracker extends TaskTracker {
private static final int HISTORY_SIZE = 10; private static final int HISTORY_SIZE = 10;
private static final String LAST_TASK_ID_PREFIX = "L"; private static final String LAST_TASK_ID_PREFIX = "L";
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
protected FrequentTaskTracker(ServerScheduleJobReq req) { protected FrequentTaskTracker(ServerScheduleJobReq req) {
super(req); super(req);
@ -121,9 +117,9 @@ public class FrequentTaskTracker extends TaskTracker {
subDetail.setSubInstanceId(subId); subDetail.setSubInstanceId(subId);
// 设置时间 // 设置时间
subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), TIME_PATTERN)); subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), OmsConstant.TIME_PATTERN));
if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) { if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) {
subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), TIME_PATTERN)); subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), OmsConstant.TIME_PATTERN));
}else { }else {
subDetail.setFinishedTime("N/A"); subDetail.setFinishedTime("N/A");
} }