mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[dev] OhMyClient add workflow api
This commit is contained in:
parent
d1f92d5ebf
commit
05fc44e11f
@ -4,9 +4,11 @@ import com.github.kfcfans.oms.common.InstanceStatus;
|
||||
import com.github.kfcfans.oms.common.OmsException;
|
||||
import com.github.kfcfans.oms.common.OpenAPIConstant;
|
||||
import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest;
|
||||
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
|
||||
import com.github.kfcfans.oms.common.response.InstanceInfoDTO;
|
||||
import com.github.kfcfans.oms.common.response.JobInfoDTO;
|
||||
import com.github.kfcfans.oms.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.common.response.WorkflowInfoDTO;
|
||||
import com.github.kfcfans.oms.common.utils.HttpUtils;
|
||||
import com.github.kfcfans.oms.common.utils.JsonUtils;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -57,7 +59,7 @@ public class OhMyClient {
|
||||
|
||||
allAddress = addressList;
|
||||
for (String addr : addressList) {
|
||||
String url = getUrl(addr, addr) + "?appName=" + appName;
|
||||
String url = getUrl(OpenAPIConstant.ASSERT, addr) + "?appName=" + appName;
|
||||
try {
|
||||
String result = HttpUtils.get(url);
|
||||
if (StringUtils.isNotEmpty(result)) {
|
||||
@ -186,7 +188,7 @@ public class OhMyClient {
|
||||
/**
|
||||
* 停止应用实例
|
||||
* @param instanceId 应用实例ID
|
||||
* @return true -> 停止成功,false -> 停止失败
|
||||
* @return true 停止成功,false 停止失败
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public ResultDTO<Void> stopInstance(Long instanceId) throws Exception {
|
||||
@ -226,6 +228,128 @@ public class OhMyClient {
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
/* ************* Workflow 区 ************* */
|
||||
/**
|
||||
* 保存工作流(包括创建和修改)
|
||||
* @param request 创建/修改 Workflow 请求
|
||||
* @return 工作流ID
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public ResultDTO<Long> saveWorkflow(SaveWorkflowRequest request) throws Exception {
|
||||
request.setAppId(appId);
|
||||
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
|
||||
String json = JsonUtils.toJSONStringUnsafe(request);
|
||||
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(json, jsonType));
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 workflowId 查询工作流信息
|
||||
* @param workflowId workflowId
|
||||
* @return 工作流信息
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public ResultDTO<WorkflowInfoDTO> fetchWorkflow(Long workflowId) throws Exception {
|
||||
RequestBody body = new FormBody.Builder()
|
||||
.add("workflowId", workflowId.toString())
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body);
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 禁用某个工作流
|
||||
* @param workflowId 工作流ID
|
||||
* @return 标准返回对象
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public ResultDTO<Void> disableWorkflow(Long workflowId) throws Exception {
|
||||
RequestBody body = new FormBody.Builder()
|
||||
.add("workflowId", workflowId.toString())
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body);
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 启用某个工作流
|
||||
* @param workflowId workflowId
|
||||
* @return 标准返回对象
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public ResultDTO<Void> enableWorkflow(Long workflowId) throws Exception {
|
||||
RequestBody body = new FormBody.Builder()
|
||||
.add("workflowId", workflowId.toString())
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body);
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除某个工作流
|
||||
* @param workflowId workflowId
|
||||
* @return 标准返回对象
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public ResultDTO<Void> deleteWorkflow(Long workflowId) throws Exception {
|
||||
RequestBody body = new FormBody.Builder()
|
||||
.add("workflowId", workflowId.toString())
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body);
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 运行工作流
|
||||
* @param workflowId workflowId
|
||||
* @return 工作流实例ID
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public ResultDTO<Long> runWorkflow(Long workflowId) throws Exception {
|
||||
FormBody.Builder builder = new FormBody.Builder()
|
||||
.add("workflowId", workflowId.toString())
|
||||
.add("appId", appId.toString());
|
||||
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
/* ************* Workflow Instance 区 ************* */
|
||||
/**
|
||||
* 停止应用实例
|
||||
* @param wfInstanceId 工作流实例ID
|
||||
* @return true 停止成功 ; false 停止失败
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public ResultDTO<Void> stopWorkflowInstance(Long wfInstanceId) throws Exception {
|
||||
RequestBody body = new FormBody.Builder()
|
||||
.add("wfInstanceId", wfInstanceId.toString())
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body);
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询任务实例的信息
|
||||
* @param wfInstanceId 任务实例ID
|
||||
* @return 任务实例信息
|
||||
* @throws Exception 潜在的异常
|
||||
*/
|
||||
public ResultDTO<InstanceInfoDTO> fetchWorkflowInstanceInfo(Long wfInstanceId) throws Exception {
|
||||
RequestBody body = new FormBody.Builder()
|
||||
.add("wfInstanceId", wfInstanceId.toString())
|
||||
.add("appId", appId.toString())
|
||||
.build();
|
||||
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body);
|
||||
return JsonUtils.parseObject(post, ResultDTO.class);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private String postHA(String path, RequestBody requestBody) {
|
||||
|
||||
// 先尝试默认地址
|
||||
|
72
oh-my-scheduler-client/src/test/java/TestWorkflow.java
Normal file
72
oh-my-scheduler-client/src/test/java/TestWorkflow.java
Normal file
@ -0,0 +1,72 @@
|
||||
import com.github.kfcfans.oms.client.OhMyClient;
|
||||
import com.github.kfcfans.oms.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
|
||||
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 测试 Client(workflow部分)
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/6/2
|
||||
*/
|
||||
public class TestWorkflow {
|
||||
|
||||
private static OhMyClient ohMyClient;
|
||||
|
||||
@BeforeAll
|
||||
public static void initClient() throws Exception {
|
||||
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveWorkflow() throws Exception {
|
||||
|
||||
// DAG 图
|
||||
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
|
||||
|
||||
nodes.add(new PEWorkflowDAG.Node(1L, "node-1", null, false, null));
|
||||
nodes.add(new PEWorkflowDAG.Node(2L, "node-2", null, false, null));
|
||||
|
||||
PEWorkflowDAG peWorkflowDAG = new PEWorkflowDAG(nodes, null);
|
||||
SaveWorkflowRequest req = new SaveWorkflowRequest();
|
||||
|
||||
req.setWfName("workflow-by-client");
|
||||
req.setWfDescription("created by client");
|
||||
req.setPEWorkflowDAG(peWorkflowDAG);
|
||||
req.setEnable(true);
|
||||
req.setTimeExpressionType(TimeExpressionType.API);
|
||||
|
||||
System.out.println(ohMyClient.saveWorkflow(req));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableWorkflow() throws Exception {
|
||||
System.out.println(ohMyClient.disableWorkflow(1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteWorkflow() throws Exception {
|
||||
System.out.println(ohMyClient.deleteWorkflow(1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableWorkflow() throws Exception {
|
||||
System.out.println(ohMyClient.enableWorkflow(1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchWorkflowInfo() throws Exception {
|
||||
System.out.println(ohMyClient.fetchWorkflow(1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWorkflow() throws Exception {
|
||||
System.out.println(ohMyClient.runWorkflow(1L));
|
||||
}
|
||||
|
||||
}
|
@ -11,6 +11,8 @@ public class OpenAPIConstant {
|
||||
public static final String WEB_PATH = "/openApi";
|
||||
|
||||
public static final String ASSERT = "/assert";
|
||||
|
||||
/* ************* JOB 区 ************* */
|
||||
public static final String SAVE_JOB = "/saveJob";
|
||||
public static final String FETCH_JOB = "/fetchJob";
|
||||
public static final String DISABLE_JOB = "/disableJob";
|
||||
@ -18,9 +20,20 @@ public class OpenAPIConstant {
|
||||
public static final String DELETE_JOB = "/deleteJob";
|
||||
public static final String RUN_JOB = "/runJob";
|
||||
|
||||
|
||||
/* ************* Instance 区 ************* */
|
||||
public static final String STOP_INSTANCE = "/stopInstance";
|
||||
public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus";
|
||||
public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo";
|
||||
|
||||
/* ************* Workflow 区 ************* */
|
||||
public static final String SAVE_WORKFLOW = "/saveWorkflow";
|
||||
public static final String FETCH_WORKFLOW = "/fetchWorkflow";
|
||||
public static final String DISABLE_WORKFLOW = "/disableWorkflow";
|
||||
public static final String ENABLE_WORKFLOW = "/enableWorkflow";
|
||||
public static final String DELETE_WORKFLOW = "/deleteWorkflow";
|
||||
public static final String RUN_WORKFLOW = "/runWorkflow";
|
||||
|
||||
/* ************* WorkflowInstance 区 ************* */
|
||||
public static final String STOP_WORKFLOW_INSTANCE = "/stopWfInstance";
|
||||
public static final String FETCH_WORKFLOW_INSTANCE_INFO = "/fetchWfInstanceInfo";
|
||||
}
|
||||
|
@ -19,7 +19,8 @@ public enum TimeExpressionType {
|
||||
API(1),
|
||||
CRON(2),
|
||||
FIX_RATE(3),
|
||||
FIX_DELAY(4);
|
||||
FIX_DELAY(4),
|
||||
WORKFLOW(5);
|
||||
|
||||
int v;
|
||||
|
||||
|
@ -1,9 +1,12 @@
|
||||
package com.github.kfcfans.oms.common.model;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -15,7 +18,6 @@ import java.util.List;
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class PEWorkflowDAG {
|
||||
|
||||
// DAG 图(点线表示法)
|
||||
@ -44,4 +46,9 @@ public class PEWorkflowDAG {
|
||||
private Long from;
|
||||
private Long to;
|
||||
}
|
||||
|
||||
public PEWorkflowDAG(@Nonnull List<Node> nodes, @Nullable List<Edge> edges) {
|
||||
this.nodes = nodes;
|
||||
this.edges = edges == null ? Lists.newLinkedList() : edges;
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ public class SaveJobInfoRequest {
|
||||
private String jobName;
|
||||
// 任务描述
|
||||
private String jobDescription;
|
||||
// 任务所属的应用ID(Client无需填写该参数)
|
||||
// 任务所属的应用ID(Client无需填写该参数,自动填充)
|
||||
private Long appId;
|
||||
// 任务自带的参数
|
||||
private String jobParams;
|
||||
|
@ -2,6 +2,7 @@ package com.github.kfcfans.oms.common.request.http;
|
||||
|
||||
import com.github.kfcfans.oms.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
@ -17,27 +18,29 @@ public class SaveWorkflowRequest {
|
||||
|
||||
private Long id;
|
||||
|
||||
// 工作流名称
|
||||
private String wfName;
|
||||
// 工作流描述
|
||||
private String wfDescription;
|
||||
|
||||
// 所属应用ID
|
||||
// 所属应用ID(OpenClient不需要用户填写,自动填充)
|
||||
private Long appId;
|
||||
|
||||
// 点线表示法
|
||||
private PEWorkflowDAG pEWorkflowDAG;
|
||||
|
||||
/* ************************** 定时参数 ************************** */
|
||||
// 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
|
||||
// 时间表达式类型,仅支持 CRON 和 API
|
||||
private TimeExpressionType timeExpressionType;
|
||||
// 时间表达式,CRON/NULL/LONG/LONG
|
||||
private String timeExpression;
|
||||
|
||||
// 最大同时运行的工作流个数,默认 1
|
||||
private Integer maxWfInstanceNum;
|
||||
private Integer maxWfInstanceNum = 1;
|
||||
|
||||
// ENABLE / DISABLE
|
||||
private boolean enable;
|
||||
private boolean enable = true;
|
||||
|
||||
// 工作流整体失败的报警
|
||||
private List<Long> notifyUserIds;
|
||||
private List<Long> notifyUserIds = Lists.newLinkedList();
|
||||
}
|
||||
|
@ -0,0 +1,45 @@
|
||||
package com.github.kfcfans.oms.common.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* workflowInfo 对外输出对象
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/6/2
|
||||
*/
|
||||
@Data
|
||||
public class WorkflowInfoDTO {
|
||||
|
||||
private Long id;
|
||||
private String wfName;
|
||||
private String wfDescription;
|
||||
|
||||
// 所属应用ID
|
||||
private Long appId;
|
||||
|
||||
// 工作流的DAG图信息(点线式DAG的json)
|
||||
private String peDAG;
|
||||
|
||||
/* ************************** 定时参数 ************************** */
|
||||
// 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
|
||||
private Integer timeExpressionType;
|
||||
// 时间表达式,CRON/NULL/LONG/LONG
|
||||
private String timeExpression;
|
||||
|
||||
// 最大同时运行的工作流个数,默认 1
|
||||
private Integer maxWfInstanceNum;
|
||||
|
||||
// 1 正常运行,2 停止(不再调度)
|
||||
private Integer status;
|
||||
// 下一次调度时间
|
||||
private Long nextTriggerTime;
|
||||
|
||||
// 工作流整体失败的报警
|
||||
private String notifyUserIds;
|
||||
|
||||
private Date gmtCreate;
|
||||
private Date gmtModified;
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package com.github.kfcfans.oms.common.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* WorkflowInstanceInfo 对外输出对象
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/6/2
|
||||
*/
|
||||
@Data
|
||||
public class WorkflowInstanceInfoDTO {
|
||||
|
||||
private Long id;
|
||||
private Long appId;
|
||||
|
||||
private Long wfInstanceId;
|
||||
private Long workflowId;
|
||||
|
||||
// workflow 状态(WorkflowInstanceStatus)
|
||||
private Integer status;
|
||||
|
||||
private String dag;
|
||||
private String result;
|
||||
|
||||
// 实际触发时间
|
||||
private Long actualTriggerTime;
|
||||
// 结束时间
|
||||
private Long finishedTime;
|
||||
|
||||
private Date gmtCreate;
|
||||
private Date gmtModified;
|
||||
}
|
@ -40,19 +40,19 @@ public class CacheService {
|
||||
public CacheService() {
|
||||
jobId2JobNameCache = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(Duration.ofMinutes(1))
|
||||
.maximumSize(1024)
|
||||
.maximumSize(512)
|
||||
.build();
|
||||
|
||||
workflowId2WorkflowNameCache = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(Duration.ofMinutes(1))
|
||||
.maximumSize(1024)
|
||||
.maximumSize(512)
|
||||
.build();
|
||||
|
||||
instanceId2AppId = CacheBuilder.newBuilder()
|
||||
.maximumSize(4096)
|
||||
.maximumSize(1024)
|
||||
.build();
|
||||
jobId2AppId = CacheBuilder.newBuilder()
|
||||
.maximumSize(4096)
|
||||
.maximumSize(1024)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -55,7 +55,7 @@ public class DispatchService {
|
||||
*/
|
||||
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) {
|
||||
Long jobId = jobInfo.getId();
|
||||
log.info("[DispatchService] start to dispatch job: {};instancePrams: {}.", jobInfo, instanceParams);
|
||||
log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams);
|
||||
|
||||
String dbInstanceParams = instanceParams == null ? "" : instanceParams;
|
||||
|
||||
@ -66,7 +66,7 @@ public class DispatchService {
|
||||
// 超出最大同时运行限制,不执行调度
|
||||
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
|
||||
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
|
||||
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount);
|
||||
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams);
|
||||
|
||||
InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
|
||||
@ -91,7 +91,7 @@ public class DispatchService {
|
||||
|
||||
if (CollectionUtils.isEmpty(finalWorkers)) {
|
||||
String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId());
|
||||
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription);
|
||||
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams);
|
||||
|
||||
InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
|
||||
@ -137,7 +137,7 @@ public class DispatchService {
|
||||
String taskTrackerAddress = finalWorkers.get(0);
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress);
|
||||
taskTrackerActor.tell(req, null);
|
||||
log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString());
|
||||
log.debug("[Dispatcher-{}|{}] send request({}) to TaskTracker({}) succeed.", jobId, instanceId, req, taskTrackerActor.pathString());
|
||||
|
||||
// 修改状态
|
||||
instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams);
|
||||
|
@ -11,7 +11,6 @@ import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
@ -39,8 +38,6 @@ public class JobService {
|
||||
@Resource
|
||||
private DispatchService dispatchService;
|
||||
@Resource
|
||||
private IdGenerateService idGenerateService;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
@ -157,8 +154,7 @@ public class JobService {
|
||||
jobInfoRepository.saveAndFlush(jobInfoDO);
|
||||
|
||||
// 2. 关闭秒级任务
|
||||
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
|
||||
if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) {
|
||||
if (!TimeExpressionType.frequentTypes.contains(jobInfoDO.getTimeExpressionType())) {
|
||||
return;
|
||||
}
|
||||
List<InstanceInfoDO> executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus);
|
||||
|
@ -76,7 +76,7 @@ public class WorkflowInstanceManager {
|
||||
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
|
||||
|
||||
}catch (Exception e) {
|
||||
log.error("[Workflow-{}] parse PEDag({}) failed.", wfInfo.getId(), wfInfo.getPeDAG(), e);
|
||||
log.error("[Workflow-{}|{}] parse PEDag({}) failed.", wfInfo.getId(), wfInstanceId, wfInfo.getPeDAG(), e);
|
||||
|
||||
newWfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
|
||||
newWfInstance.setResult(e.getMessage());
|
||||
@ -102,7 +102,7 @@ public class WorkflowInstanceManager {
|
||||
|
||||
// 不是等待中,不再继续执行(可能上一流程已经失败)
|
||||
if (wfInstanceInfo.getStatus() != WorkflowInstanceStatus.WAITING.getV()) {
|
||||
log.info("[Workflow-{}] workflowInstance({}) need't running any more.", wfInfo.getId(), wfInstanceInfo);
|
||||
log.info("[Workflow-{}|{}] workflowInstance({}) need't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -132,7 +132,7 @@ public class WorkflowInstanceManager {
|
||||
wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
|
||||
wfInstanceInfo.setDag(JSONObject.toJSONString(workflowDAG));
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
|
||||
log.info("[Workflow-{}] start workflow successfully, wfInstanceId={}", wfInfo.getId(), wfInstanceId);
|
||||
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
|
||||
|
||||
// 真正开始执行根任务
|
||||
roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, null));
|
||||
@ -142,7 +142,7 @@ public class WorkflowInstanceManager {
|
||||
wfInstanceInfo.setResult(e.getMessage());
|
||||
wfInstanceInfo.setFinishedTime(System.currentTimeMillis());
|
||||
|
||||
log.error("[Workflow-{}] submit workflow: {} failed.", wfInfo.getId(), wfInfo, e);
|
||||
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
|
||||
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
|
||||
}
|
||||
@ -181,7 +181,7 @@ public class WorkflowInstanceManager {
|
||||
head.setFinished(true);
|
||||
head.setResult(result);
|
||||
|
||||
log.debug("[Workflow-{}] node(jobId={}) finished in workflowInstance(wfInstanceId={}), success={},result={}", wfId, head.getJobId(), wfInstanceId, success, result);
|
||||
log.debug("[Workflow-{}|{}] node(jobId={}) finished in workflowInstance, success={},result={}", wfId, wfInstanceId, head.getJobId(), success, result);
|
||||
}
|
||||
queue.addAll(head.getSuccessors());
|
||||
|
||||
@ -197,7 +197,7 @@ public class WorkflowInstanceManager {
|
||||
wfInstance.setFinishedTime(System.currentTimeMillis());
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
|
||||
log.warn("[Workflow-{}] workflow(wfInstanceId={}) process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
|
||||
log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -236,7 +236,7 @@ public class WorkflowInstanceManager {
|
||||
jobId2InstanceId.put(jobId, newInstanceId);
|
||||
jobId2InstanceParams.put(jobId, JSONObject.toJSONString(preJobId2Result));
|
||||
|
||||
log.debug("[Workflow-{}] workflowInstance(wfInstanceId={}) start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId);
|
||||
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId);
|
||||
});
|
||||
|
||||
if (allFinished.get()) {
|
||||
@ -245,7 +245,7 @@ public class WorkflowInstanceManager {
|
||||
wfInstance.setResult(result);
|
||||
wfInstance.setFinishedTime(System.currentTimeMillis());
|
||||
|
||||
log.info("[Workflow-{}] workflowInstance(wfInstanceId={}) process successfully.", wfId, wfInstanceId);
|
||||
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
|
||||
}
|
||||
wfInstance.setDag(JSONObject.toJSONString(dag));
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
@ -259,7 +259,7 @@ public class WorkflowInstanceManager {
|
||||
wfInstance.setFinishedTime(System.currentTimeMillis());
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
|
||||
log.error("[Workflow-{}] update failed for workflowInstance({}).", wfId, wfInstanceId, e);
|
||||
log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -274,7 +274,7 @@ public class WorkflowInstanceManager {
|
||||
private void runInstance(Long jobId, Long instanceId, Long wfInstanceId, String instanceParams) {
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
|
||||
// 洗去时间表达式类型
|
||||
jobInfo.setTimeExpressionType(TimeExpressionType.API.getV());
|
||||
jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV());
|
||||
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, wfInstanceId);
|
||||
}
|
||||
|
||||
|
@ -4,12 +4,14 @@ import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.kfcfans.oms.common.OmsException;
|
||||
import com.github.kfcfans.oms.common.SystemInstanceResult;
|
||||
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
|
||||
import com.github.kfcfans.oms.common.response.WorkflowInstanceInfoDTO;
|
||||
import com.github.kfcfans.oms.server.model.WorkflowDAG;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
import com.google.common.collect.Queues;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
@ -39,10 +41,7 @@ public class WorkflowInstanceService {
|
||||
* @param appId 所属应用ID
|
||||
*/
|
||||
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId));
|
||||
if (!Objects.equals(appId, wfInstance.getAppId())) {
|
||||
throw new OmsException("Permission Denied!");
|
||||
}
|
||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
throw new OmsException("workflow instance already stopped");
|
||||
}
|
||||
@ -70,4 +69,19 @@ public class WorkflowInstanceService {
|
||||
log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId);
|
||||
}
|
||||
|
||||
public WorkflowInstanceInfoDTO fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||
WorkflowInstanceInfoDTO dto = new WorkflowInstanceInfoDTO();
|
||||
BeanUtils.copyProperties(wfInstance, dto);
|
||||
return dto;
|
||||
}
|
||||
|
||||
private WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) {
|
||||
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId));
|
||||
if (!Objects.equals(appId, wfInstance.getAppId())) {
|
||||
throw new OmsException("Permission Denied!");
|
||||
}
|
||||
return wfInstance;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.kfcfans.oms.common.OmsException;
|
||||
import com.github.kfcfans.oms.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
|
||||
import com.github.kfcfans.oms.common.response.WorkflowInfoDTO;
|
||||
import com.github.kfcfans.oms.server.common.utils.WorkflowDAGUtils;
|
||||
import com.github.kfcfans.oms.server.common.SJ;
|
||||
import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
|
||||
@ -70,6 +71,19 @@ public class WorkflowService {
|
||||
return newEntity.getId();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取工作流元信息
|
||||
* @param wfId 工作流ID
|
||||
* @param appId 应用ID
|
||||
* @return 对外输出对象
|
||||
*/
|
||||
public WorkflowInfoDTO fetchWorkflow(Long wfId, Long appId) {
|
||||
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
|
||||
WorkflowInfoDTO dto = new WorkflowInfoDTO();
|
||||
BeanUtils.copyProperties(wfInfo, dto);
|
||||
return dto;
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除工作流(软删除)
|
||||
* @param wfId 工作流ID
|
||||
@ -94,6 +108,18 @@ public class WorkflowService {
|
||||
workflowInfoRepository.saveAndFlush(wfInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 启用工作流
|
||||
* @param wfId 工作流ID
|
||||
* @param appId 所属应用ID
|
||||
*/
|
||||
public void enableWorkflow(Long wfId, Long appId) {
|
||||
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
|
||||
wfInfo.setStatus(SwitchableStatus.ENABLE.getV());
|
||||
wfInfo.setGmtModified(new Date());
|
||||
workflowInfoRepository.saveAndFlush(wfInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 立即运行工作流
|
||||
* @param wfId 工作流ID
|
||||
|
@ -2,15 +2,16 @@ package com.github.kfcfans.oms.server.web.controller;
|
||||
|
||||
import com.github.kfcfans.oms.common.InstanceStatus;
|
||||
import com.github.kfcfans.oms.common.OpenAPIConstant;
|
||||
import com.github.kfcfans.oms.common.response.InstanceInfoDTO;
|
||||
import com.github.kfcfans.oms.common.response.JobInfoDTO;
|
||||
import com.github.kfcfans.oms.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
|
||||
import com.github.kfcfans.oms.common.response.*;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.CacheService;
|
||||
import com.github.kfcfans.oms.server.service.JobService;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
import com.github.kfcfans.oms.common.request.http.SaveJobInfoRequest;
|
||||
import com.github.kfcfans.oms.server.service.workflow.WorkflowInstanceService;
|
||||
import com.github.kfcfans.oms.server.service.workflow.WorkflowService;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
@ -29,6 +30,10 @@ public class OpenAPIController {
|
||||
private JobService jobService;
|
||||
@Resource
|
||||
private InstanceService instanceService;
|
||||
@Resource
|
||||
private WorkflowService workflowService;
|
||||
@Resource
|
||||
private WorkflowInstanceService workflowInstanceService;
|
||||
|
||||
@Resource
|
||||
private CacheService cacheService;
|
||||
@ -105,6 +110,52 @@ public class OpenAPIController {
|
||||
return ResultDTO.success(instanceService.getInstanceInfo(instanceId));
|
||||
}
|
||||
|
||||
/* ************* Workflow 区 ************* */
|
||||
@PostMapping(OpenAPIConstant.SAVE_WORKFLOW)
|
||||
public ResultDTO<Long> saveWorkflow(@RequestBody SaveWorkflowRequest request) throws Exception {
|
||||
if (request.getId() != null) {
|
||||
checkJobIdValid(request.getId(), request.getAppId());
|
||||
}
|
||||
return ResultDTO.success(workflowService.saveWorkflow(request));
|
||||
}
|
||||
|
||||
@PostMapping(OpenAPIConstant.FETCH_WORKFLOW)
|
||||
public ResultDTO<WorkflowInfoDTO> fetchWorkflow(Long workflowId, Long appId) {
|
||||
return ResultDTO.success(workflowService.fetchWorkflow(workflowId, appId));
|
||||
}
|
||||
|
||||
@PostMapping(OpenAPIConstant.DELETE_WORKFLOW)
|
||||
public ResultDTO<Void> deleteWorkflow(Long workflowId, Long appId) {
|
||||
workflowService.deleteWorkflow(workflowId, appId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
@PostMapping(OpenAPIConstant.DISABLE_WORKFLOW)
|
||||
public ResultDTO<Void> disableWorkflow(Long workflowId, Long appId) {
|
||||
workflowService.disableWorkflow(workflowId, appId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
@PostMapping(OpenAPIConstant.ENABLE_WORKFLOW)
|
||||
public ResultDTO<Void> enableWorkflow(Long workflowId, Long appId) {
|
||||
workflowService.enableWorkflow(workflowId, appId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@PostMapping(OpenAPIConstant.RUN_WORKFLOW)
|
||||
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) {
|
||||
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId));
|
||||
}
|
||||
|
||||
/* ************* Workflow Instance 区 ************* */
|
||||
@PostMapping(OpenAPIConstant.STOP_WORKFLOW_INSTANCE)
|
||||
public ResultDTO<Void> stopWorkflowInstance(Long wfInstance, Long appId) {
|
||||
workflowInstanceService.stopWorkflowInstance(wfInstance, appId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
@PostMapping(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO)
|
||||
public ResultDTO<WorkflowInstanceInfoDTO> fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) {
|
||||
return ResultDTO.success(workflowInstanceService.fetchWorkflowInstanceInfo(wfInstanceId, appId));
|
||||
}
|
||||
|
||||
private void checkInstanceIdValid(Long instanceId, Long appId) {
|
||||
Long realAppId = cacheService.getAppIdByInstanceId(instanceId);
|
||||
if (realAppId == null) {
|
||||
|
@ -20,6 +20,11 @@
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 定时调度信息控制台就不输出了,看着就乱 -->
|
||||
<logger name="com.github.kfcfans.oms.server.service.timing" level="WARN" additivity="false">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</logger>
|
||||
|
||||
<logger name="com.github.kfcfans.oms" level="DEBUG" additivity="false">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</logger>
|
||||
|
Loading…
x
Reference in New Issue
Block a user