mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: 工作流支持重复的任务节点、支持任务参数(静态)个性化、调整实例参数传递机制
This commit is contained in:
parent
6d9af1cff0
commit
fd36f2f8e1
@ -0,0 +1,18 @@
|
||||
package com.github.kfcfans.powerjob.common;
|
||||
|
||||
/**
|
||||
* 工作流上下文相关常量
|
||||
*
|
||||
* @author Echo009
|
||||
* @since 2021/2/3
|
||||
*/
|
||||
public final class WorkflowContextConstant {
|
||||
|
||||
/**
|
||||
* 上下文初始参数
|
||||
*/
|
||||
public static final String CONTEXT_INIT_PARAMS_KEY = "initParams";
|
||||
|
||||
|
||||
|
||||
}
|
@ -21,6 +21,9 @@ import java.util.Set;
|
||||
*/
|
||||
public class WorkflowDAGUtils {
|
||||
|
||||
private WorkflowDAGUtils() {
|
||||
|
||||
}
|
||||
/**
|
||||
* 获取所有根节点
|
||||
*
|
||||
@ -29,11 +32,11 @@ public class WorkflowDAGUtils {
|
||||
*/
|
||||
public static List<PEWorkflowDAG.Node> listRoots(PEWorkflowDAG peWorkflowDAG) {
|
||||
|
||||
Map<Long, PEWorkflowDAG.Node> jobId2Node = Maps.newHashMap();
|
||||
peWorkflowDAG.getNodes().forEach(node -> jobId2Node.put(node.getJobId(), node));
|
||||
peWorkflowDAG.getEdges().forEach(edge -> jobId2Node.remove(edge.getTo()));
|
||||
Map<Long, PEWorkflowDAG.Node> nodeId2Node = Maps.newHashMap();
|
||||
peWorkflowDAG.getNodes().forEach(node -> nodeId2Node.put(node.getNodeId(), node));
|
||||
peWorkflowDAG.getEdges().forEach(edge -> nodeId2Node.remove(edge.getTo()));
|
||||
|
||||
return Lists.newLinkedList(jobId2Node.values());
|
||||
return Lists.newLinkedList(nodeId2Node.values());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -44,13 +47,13 @@ public class WorkflowDAGUtils {
|
||||
*/
|
||||
public static boolean valid(PEWorkflowDAG peWorkflowDAG) {
|
||||
|
||||
// 点不允许重复,一个工作流中某个任务只允许出现一次
|
||||
Set<Long> jobIds = Sets.newHashSet();
|
||||
// 节点ID
|
||||
Set<Long> nodeIds = Sets.newHashSet();
|
||||
for (PEWorkflowDAG.Node n : peWorkflowDAG.getNodes()) {
|
||||
if (jobIds.contains(n.getJobId())) {
|
||||
if (nodeIds.contains(n.getNodeId())) {
|
||||
return false;
|
||||
}
|
||||
jobIds.add(n.getJobId());
|
||||
nodeIds.add(n.getNodeId());
|
||||
}
|
||||
|
||||
try {
|
||||
@ -64,6 +67,7 @@ public class WorkflowDAGUtils {
|
||||
}
|
||||
return true;
|
||||
} catch (Exception ignore) {
|
||||
// ignore
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@ -71,29 +75,29 @@ public class WorkflowDAGUtils {
|
||||
/**
|
||||
* 将点线表示法的DAG图转化为引用表达法的DAG图
|
||||
*
|
||||
* @param PEWorkflowDAG 点线表示法的DAG图
|
||||
* @param peWorkflowDAG 点线表示法的DAG图
|
||||
* @return 引用表示法的DAG图
|
||||
*/
|
||||
public static WorkflowDAG convert(PEWorkflowDAG PEWorkflowDAG) {
|
||||
public static WorkflowDAG convert(PEWorkflowDAG peWorkflowDAG) {
|
||||
Set<Long> rootIds = Sets.newHashSet();
|
||||
Map<Long, WorkflowDAG.Node> id2Node = Maps.newHashMap();
|
||||
|
||||
if (PEWorkflowDAG.getNodes() == null || PEWorkflowDAG.getNodes().isEmpty()) {
|
||||
if (peWorkflowDAG.getNodes() == null || peWorkflowDAG.getNodes().isEmpty()) {
|
||||
throw new PowerJobException("empty graph");
|
||||
}
|
||||
|
||||
// 创建节点
|
||||
PEWorkflowDAG.getNodes().forEach(node -> {
|
||||
Long jobId = node.getJobId();
|
||||
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), jobId, node.getJobName(), InstanceStatus.WAITING_DISPATCH.getV());
|
||||
id2Node.put(jobId, n);
|
||||
peWorkflowDAG.getNodes().forEach(node -> {
|
||||
Long nodeId = node.getNodeId();
|
||||
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), node.getJobId(), node.getJobName(), InstanceStatus.WAITING_DISPATCH.getV());
|
||||
id2Node.put(nodeId, n);
|
||||
|
||||
// 初始阶段,每一个点都设为顶点
|
||||
rootIds.add(jobId);
|
||||
rootIds.add(nodeId);
|
||||
});
|
||||
|
||||
// 连接图像
|
||||
PEWorkflowDAG.getEdges().forEach(edge -> {
|
||||
peWorkflowDAG.getEdges().forEach(edge -> {
|
||||
WorkflowDAG.Node from = id2Node.get(edge.getFrom());
|
||||
WorkflowDAG.Node to = id2Node.get(edge.getTo());
|
||||
|
||||
@ -104,12 +108,12 @@ public class WorkflowDAGUtils {
|
||||
from.getSuccessors().add(to);
|
||||
|
||||
// 被连接的点不可能成为 root,移除
|
||||
rootIds.remove(to.getJobId());
|
||||
rootIds.remove(to.getNodeId());
|
||||
});
|
||||
|
||||
// 合法性校验(至少存在一个顶点)
|
||||
if (rootIds.size() < 1) {
|
||||
throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(PEWorkflowDAG));
|
||||
if (rootIds.isEmpty()) {
|
||||
throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(peWorkflowDAG));
|
||||
}
|
||||
|
||||
List<WorkflowDAG.Node> roots = Lists.newLinkedList();
|
||||
@ -121,13 +125,13 @@ public class WorkflowDAGUtils {
|
||||
private static boolean invalidPath(WorkflowDAG.Node root, Set<Long> ids) {
|
||||
|
||||
// 递归出口(出现之前的节点则代表有环,失败;出现无后继者节点,则说明该路径成功)
|
||||
if (ids.contains(root.getJobId())) {
|
||||
if (ids.contains(root.getNodeId())) {
|
||||
return true;
|
||||
}
|
||||
if (root.getSuccessors().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
ids.add(root.getJobId());
|
||||
ids.add(root.getNodeId());
|
||||
for (WorkflowDAG.Node node : root.getSuccessors()) {
|
||||
if (invalidPath(node, Sets.newHashSet(ids))) {
|
||||
return true;
|
||||
|
@ -38,6 +38,13 @@ public class InstanceInfoDO {
|
||||
* 任务所属应用的ID,冗余提高查询效率
|
||||
*/
|
||||
private Long instanceId;
|
||||
/**
|
||||
* 任务参数(静态)
|
||||
* @since 2021/2/01
|
||||
*/
|
||||
@Lob
|
||||
@Column
|
||||
private String jobParams;
|
||||
/**
|
||||
* 任务实例参数(动态)
|
||||
*/
|
||||
|
@ -6,6 +6,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
import javax.transaction.Transactional;
|
||||
import java.util.Date;
|
||||
@ -29,49 +30,83 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
|
||||
|
||||
/**
|
||||
* 更新任务执行记录内容(DispatchService专用)
|
||||
* @param instanceId 任务实例ID,分布式唯一
|
||||
* @param status 任务实例运行状态
|
||||
* @param runningTimes 运行次数
|
||||
*
|
||||
* @param instanceId 实例 ID
|
||||
* @param status 状态
|
||||
* @param actualTriggerTime 实际调度时间
|
||||
* @param finishedTime 完成时间
|
||||
* @param taskTrackerAddress taskTracker 地址
|
||||
* @param result 结果
|
||||
* @return 更新数量
|
||||
* @param modifyTime 更新时间
|
||||
* @return 更新记录数量
|
||||
*/
|
||||
@Transactional
|
||||
@Transactional(rollbackOn = Exception.class)
|
||||
@Modifying
|
||||
@CanIgnoreReturnValue
|
||||
@Query(value = "update InstanceInfoDO set status = ?2, runningTimes = ?3, actualTriggerTime = ?4, finishedTime = ?5, taskTrackerAddress = ?6, result = ?7, instanceParams = ?8, gmtModified = ?9 where instanceId = ?1")
|
||||
int update4TriggerFailed(long instanceId, int status, long runningTimes, long actualTriggerTime, long finishedTime, String taskTrackerAddress, String result, String instanceParams, Date modifyTime);
|
||||
@Query(value = "update InstanceInfoDO set status = :status, actualTriggerTime = :actualTriggerTime, finishedTime = :finishedTime, taskTrackerAddress = :taskTrackerAddress, result = :result, gmtModified = :modifyTime where instanceId = :instanceId")
|
||||
int update4TriggerFailed(@Param("instanceId") long instanceId, @Param("status") int status, @Param("actualTriggerTime") long actualTriggerTime, @Param("finishedTime") long finishedTime, @Param("taskTrackerAddress") String taskTrackerAddress, @Param("result") String result, @Param("modifyTime") Date modifyTime);
|
||||
|
||||
@Transactional
|
||||
/**
|
||||
* 更新任务执行记录内容(DispatchService专用)
|
||||
*
|
||||
* @param instanceId 任务实例ID,分布式唯一
|
||||
* @param status 状态
|
||||
* @param actualTriggerTime 实际调度时间
|
||||
* @param taskTrackerAddress taskTracker 地址
|
||||
* @param modifyTime 更新时间
|
||||
* @return 更新记录数量
|
||||
*/
|
||||
@Transactional(rollbackOn = Exception.class)
|
||||
@Modifying
|
||||
@CanIgnoreReturnValue
|
||||
@Query(value = "update InstanceInfoDO set status = ?2, runningTimes = ?3, actualTriggerTime = ?4, taskTrackerAddress = ?5, instanceParams = ?6, gmtModified = ?7 where instanceId = ?1")
|
||||
int update4TriggerSucceed(long instanceId, int status, long runningTimes, long actualTriggerTime, String taskTrackerAddress, String instanceParams, Date modifyTime);
|
||||
@Query(value = "update InstanceInfoDO set status = :status, actualTriggerTime = :actualTriggerTime, taskTrackerAddress = :taskTrackerAddress, gmtModified = :modifyTime where instanceId = :instanceId")
|
||||
int update4TriggerSucceed(@Param("instanceId") long instanceId, @Param("status") int status, @Param("actualTriggerTime") long actualTriggerTime, @Param("taskTrackerAddress") String taskTrackerAddress, @Param("modifyTime") Date modifyTime);
|
||||
|
||||
/**
|
||||
* 更新固定频率任务的执行记录
|
||||
*
|
||||
* @param instanceId 任务实例ID,分布式唯一
|
||||
* @param status 状态
|
||||
* @param runningTimes 执行次数
|
||||
* @param modifyTime 更新时间
|
||||
* @return 更新记录数量
|
||||
*/
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Transactional(rollbackOn = Exception.class)
|
||||
@CanIgnoreReturnValue
|
||||
@Query(value = "update InstanceInfoDO set status = ?2, runningTimes = ?3, gmtModified = ?4 where instanceId = ?1")
|
||||
int update4FrequentJob(long instanceId, int status, long runningTimes, Date modifyTime);
|
||||
@Query(value = "update InstanceInfoDO set status = :status, runningTimes = :runningTimes, gmtModified = :modifyTime where instanceId = :instanceId")
|
||||
int update4FrequentJob(@Param("instanceId") long instanceId, @Param("status") int status, @Param("runningTimes") long runningTimes, @Param("modifyTime") Date modifyTime);
|
||||
|
||||
/* --状态检查三兄弟,对应 WAITING_DISPATCH 、 WAITING_WORKER_RECEIVE 和 RUNNING 三阶段,数据量一般不大,就不单独写SQL优化 IO 了-- */
|
||||
|
||||
// 状态检查三兄弟,对应 WAITING_DISPATCH 、 WAITING_WORKER_RECEIVE 和 RUNNING 三阶段
|
||||
// 数据量一般不大,就不单独写SQL优化 IO 了
|
||||
List<InstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
|
||||
List<InstanceInfoDO> findByAppIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
|
||||
List<InstanceInfoDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
|
||||
|
||||
|
||||
InstanceInfoDO findByInstanceId(long instanceId);
|
||||
|
||||
// 数据统计
|
||||
/* --数据统计-- */
|
||||
|
||||
long countByAppIdAndStatus(long appId, int status);
|
||||
|
||||
long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time);
|
||||
|
||||
@Query(value = "select distinct jobId from InstanceInfoDO where jobId in ?1 and status in ?2")
|
||||
List<Long> findByJobIdInAndStatusIn(List<Long> jobIds, List<Integer> status);
|
||||
|
||||
// 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧...
|
||||
// 结果只能用 int 接收
|
||||
/**
|
||||
* 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧...
|
||||
* 结果只能用 int 接收
|
||||
*
|
||||
* @param time 更新时间阈值
|
||||
* @param status 状态
|
||||
* @return 删除记录数
|
||||
*/
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Transactional(rollbackOn = Exception.class)
|
||||
@Query(value = "delete from InstanceInfoDO where gmtModified < ?1 and status in ?2")
|
||||
int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List<Integer> status);
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* JobInfo 数据访问层
|
||||
@ -18,7 +19,9 @@ import java.util.List;
|
||||
public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long>, JpaSpecificationExecutor<JobInfoDO> {
|
||||
|
||||
|
||||
// 调度专用
|
||||
/**
|
||||
* 调度专用
|
||||
*/
|
||||
List<JobInfoDO> findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time);
|
||||
|
||||
@Query(value = "select id from JobInfoDO where appId in ?1 and status = ?2 and timeExpressionType in ?3")
|
||||
@ -28,8 +31,14 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long>, JpaSp
|
||||
|
||||
Page<JobInfoDO> findByAppIdAndJobNameLikeAndStatusNot(Long appId, String condition, int status, Pageable pageable);
|
||||
|
||||
// 校验工作流包含的任务
|
||||
long countByAppIdAndStatusAndIdIn(Long appId, int status, List<Long> jobIds);
|
||||
/**
|
||||
* 校验工作流包含的任务
|
||||
* @param appId APP ID
|
||||
* @param status 状态
|
||||
* @param jobIds 任务ID
|
||||
* @return 数量
|
||||
*/
|
||||
long countByAppIdAndStatusAndIdIn(Long appId, int status, Set<Long> jobIds);
|
||||
|
||||
long countByAppIdAndStatusNot(long appId, int status);
|
||||
|
||||
|
@ -49,40 +49,49 @@ public class DispatchService {
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private static final Splitter commaSplitter = Splitter.on(",");
|
||||
private static final Splitter COMMA_SPLITTER = Splitter.on(",");
|
||||
|
||||
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024)
|
||||
public void redispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) {
|
||||
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
|
||||
dispatch(jobInfo, instanceId, currentRunningTimes, instanceInfo.getInstanceParams(), instanceInfo.getWfInstanceId());
|
||||
public void redispatch(JobInfoDO jobInfo, long instanceId) {
|
||||
// 这里暂时保留
|
||||
dispatch(jobInfo, instanceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将任务从Server派发到Worker(TaskTracker)
|
||||
* @param jobInfo 任务的元信息
|
||||
* **************************************************
|
||||
* 2021-02-03 modify by Echo009
|
||||
* 1、移除参数 当前运行次数、工作流实例ID、实例参数
|
||||
* 更改为从当前任务实例中获取获取以上信息
|
||||
* 2、移除运行次数相关的(runningTimes)处理逻辑
|
||||
* 迁移至 {@link InstanceManager#updateStatus} 中处理
|
||||
* **************************************************
|
||||
*
|
||||
* @param jobInfo 任务的元信息,注意这里传入的 jobInfo 可能为空对象
|
||||
* @param instanceId 任务实例ID
|
||||
* @param currentRunningTimes 当前运行的次数
|
||||
* @param instanceParams 实例的运行参数,API触发方式专用
|
||||
* @param wfInstanceId 工作流任务实例ID,workflow 任务专用
|
||||
*/
|
||||
@UseSegmentLock(type = "dispatch", key = "#jobInfo.getId().intValue()", concurrencyLevel = 1024)
|
||||
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes, String instanceParams, Long wfInstanceId) {
|
||||
Long jobId = jobInfo.getId();
|
||||
log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, instanceParams);
|
||||
|
||||
Date now = new Date();
|
||||
String dbInstanceParams = instanceParams == null ? "" : instanceParams;
|
||||
|
||||
public void dispatch(JobInfoDO jobInfo, long instanceId) {
|
||||
// 检查当前任务是否被取消
|
||||
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
|
||||
Long jobId = instanceInfo.getId();
|
||||
if (CANCELED.getV() == instanceInfo.getStatus()) {
|
||||
log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been canceled", jobId, instanceId);
|
||||
return;
|
||||
}
|
||||
// 任务信息已经被删除
|
||||
if (jobInfo.getId() == null) {
|
||||
log.warn("[Dispatcher-{}|{}] cancel dispatch due to job(id={}) has been deleted!", jobId, instanceId, jobId);
|
||||
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, "can't find job by id " + jobId);
|
||||
return;
|
||||
}
|
||||
|
||||
Date now = new Date();
|
||||
String dbInstanceParams = instanceInfo.getInstanceParams() == null ? "" : instanceInfo.getInstanceParams();
|
||||
log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, dbInstanceParams);
|
||||
|
||||
// 查询当前运行的实例数
|
||||
long current = System.currentTimeMillis();
|
||||
|
||||
Integer maxInstanceNum = jobInfo.getMaxInstanceNum();
|
||||
// 秒级任务只派发到一台机器,具体的 maxInstanceNum 由 TaskTracker 控制
|
||||
if (TimeExpressionType.frequentTypes.contains(jobInfo.getTimeExpressionType())) {
|
||||
@ -91,7 +100,6 @@ public class DispatchService {
|
||||
|
||||
// 0 代表不限制在线任务,还能省去一次 DB 查询
|
||||
if (maxInstanceNum > 0) {
|
||||
|
||||
// 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务不应该统计进去(比如 delay 是 1 天)
|
||||
// 由于不统计 WAITING_DISPATCH,所以这个 runningInstanceCount 不包含本任务自身
|
||||
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
|
||||
@ -99,79 +107,100 @@ public class DispatchService {
|
||||
if (runningInstanceCount >= maxInstanceNum) {
|
||||
String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum);
|
||||
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, result, now);
|
||||
|
||||
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
|
||||
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, result);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// 获取当前最合适的 worker 列表
|
||||
List<WorkerInfo> suitableWorkers = obtainSuitableWorkers(jobInfo);
|
||||
|
||||
// 获取当前所有可用的Worker
|
||||
List<WorkerInfo> allAvailableWorker = WorkerClusterManagerService.getSortedAvailableWorkers(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
|
||||
|
||||
allAvailableWorker.removeIf(worker -> {
|
||||
// 空,则全部不过滤
|
||||
if (StringUtils.isEmpty(jobInfo.getDesignatedWorkers())) {
|
||||
return false;
|
||||
}
|
||||
// 非空,只有匹配上的 worker 才不被过滤
|
||||
Set<String> designatedWorkers = Sets.newHashSet(commaSplitter.splitToList(jobInfo.getDesignatedWorkers()));
|
||||
return !designatedWorkers.contains(worker.getAddress());
|
||||
});
|
||||
|
||||
if (CollectionUtils.isEmpty(allAvailableWorker)) {
|
||||
if (CollectionUtils.isEmpty(suitableWorkers)) {
|
||||
String clusterStatusDescription = WorkerClusterManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId());
|
||||
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams, now);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, now);
|
||||
|
||||
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
|
||||
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
|
||||
return;
|
||||
}
|
||||
List<String> workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
|
||||
|
||||
// 限定集群大小(0代表不限制)
|
||||
if (jobInfo.getMaxWorkerCount() > 0) {
|
||||
if (allAvailableWorker.size() > jobInfo.getMaxWorkerCount()) {
|
||||
allAvailableWorker = allAvailableWorker.subList(0, jobInfo.getMaxWorkerCount());
|
||||
}
|
||||
}
|
||||
List<String> workerIpList = allAvailableWorker.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
|
||||
// 构造任务调度请求
|
||||
ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);
|
||||
|
||||
// 构造请求
|
||||
ServerScheduleJobReq req = new ServerScheduleJobReq();
|
||||
BeanUtils.copyProperties(jobInfo, req);
|
||||
// 传入 JobId
|
||||
req.setJobId(jobInfo.getId());
|
||||
// 传入 InstanceParams
|
||||
if (StringUtils.isEmpty(instanceParams)) {
|
||||
req.setInstanceParams(null);
|
||||
}else {
|
||||
req.setInstanceParams(instanceParams);
|
||||
}
|
||||
req.setInstanceId(instanceId);
|
||||
req.setAllWorkerAddress(workerIpList);
|
||||
|
||||
// 设置工作流ID
|
||||
req.setWfInstanceId(wfInstanceId);
|
||||
|
||||
req.setExecuteType(ExecuteType.of(jobInfo.getExecuteType()).name());
|
||||
req.setProcessorType(ProcessorType.of(jobInfo.getProcessorType()).name());
|
||||
req.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType()).name());
|
||||
|
||||
req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit());
|
||||
|
||||
req.setThreadConcurrency(jobInfo.getConcurrency());
|
||||
|
||||
// 发送请求(不可靠,需要一个后台线程定期轮询状态)
|
||||
WorkerInfo taskTracker = allAvailableWorker.get(0);
|
||||
WorkerInfo taskTracker = suitableWorkers.get(0);
|
||||
String taskTrackerAddress = taskTracker.getAddress();
|
||||
|
||||
transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
|
||||
log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req);
|
||||
|
||||
// 修改状态
|
||||
instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, dbInstanceParams, now);
|
||||
instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now);
|
||||
|
||||
// 装载缓存
|
||||
instanceMetadataService.loadJobInfo(instanceId, jobInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前最合适的 worker 列表
|
||||
*/
|
||||
private List<WorkerInfo> obtainSuitableWorkers(JobInfoDO jobInfo) {
|
||||
// 获取当前所有可用的Worker
|
||||
List<WorkerInfo> allAvailableWorker = WorkerClusterManagerService.getSortedAvailableWorkers(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
|
||||
|
||||
// 筛选指定的机器
|
||||
allAvailableWorker.removeIf(worker -> {
|
||||
// 空,则全部不过滤
|
||||
if (StringUtils.isEmpty(jobInfo.getDesignatedWorkers())) {
|
||||
return false;
|
||||
}
|
||||
// 非空,只有匹配上的 worker 才不被过滤
|
||||
Set<String> designatedWorkers = Sets.newHashSet(COMMA_SPLITTER.splitToList(jobInfo.getDesignatedWorkers()));
|
||||
return !designatedWorkers.contains(worker.getAddress());
|
||||
});
|
||||
|
||||
|
||||
// 限定集群大小(0代表不限制)
|
||||
if (!allAvailableWorker.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && allAvailableWorker.size() > jobInfo.getMaxWorkerCount()) {
|
||||
allAvailableWorker = allAvailableWorker.subList(0, jobInfo.getMaxWorkerCount());
|
||||
}
|
||||
return allAvailableWorker;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造任务调度请求
|
||||
*/
|
||||
private ServerScheduleJobReq constructServerScheduleJobReq(JobInfoDO jobInfo, InstanceInfoDO instanceInfo, List<String> finalWorkersIpList) {
|
||||
// 构造请求
|
||||
ServerScheduleJobReq req = new ServerScheduleJobReq();
|
||||
BeanUtils.copyProperties(jobInfo, req);
|
||||
// 传入 JobId
|
||||
req.setJobId(jobInfo.getId());
|
||||
// 传入 InstanceParams
|
||||
if (StringUtils.isEmpty(instanceInfo.getInstanceParams())) {
|
||||
req.setInstanceParams(null);
|
||||
} else {
|
||||
req.setInstanceParams(instanceInfo.getInstanceParams());
|
||||
}
|
||||
// 覆盖静态参数
|
||||
if (!StringUtils.isEmpty(instanceInfo.getJobParams())) {
|
||||
req.setJobParams(instanceInfo.getJobParams());
|
||||
}
|
||||
req.setInstanceId(instanceInfo.getInstanceId());
|
||||
req.setAllWorkerAddress(finalWorkersIpList);
|
||||
|
||||
// 设置工作流ID
|
||||
req.setWfInstanceId(instanceInfo.getWfInstanceId());
|
||||
|
||||
req.setExecuteType(ExecuteType.of(jobInfo.getExecuteType()).name());
|
||||
req.setProcessorType(ProcessorType.of(jobInfo.getProcessorType()).name());
|
||||
|
||||
req.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType()).name());
|
||||
req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit());
|
||||
req.setThreadConcurrency(jobInfo.getConcurrency());
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.text.ParseException;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -52,11 +53,12 @@ public class JobService {
|
||||
|
||||
/**
|
||||
* 保存/修改任务
|
||||
*
|
||||
* @param request 任务请求
|
||||
* @return 创建的任务ID(jobId)
|
||||
* @throws Exception 异常
|
||||
* @throws ParseException 异常
|
||||
*/
|
||||
public Long saveJob(SaveJobInfoRequest request) throws Exception {
|
||||
public Long saveJob(SaveJobInfoRequest request) throws ParseException {
|
||||
|
||||
request.valid();
|
||||
|
||||
@ -107,6 +109,7 @@ public class JobService {
|
||||
|
||||
/**
|
||||
* 手动立即运行某个任务
|
||||
*
|
||||
* @param jobId 任务ID
|
||||
* @param instanceParams 任务实例参数(仅 OpenAPI 存在)
|
||||
* @param delay 延迟时间,单位 毫秒
|
||||
@ -119,14 +122,12 @@ public class JobService {
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
|
||||
|
||||
log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay);
|
||||
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
|
||||
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
|
||||
instanceInfoRepository.flush();
|
||||
if (delay <= 0) {
|
||||
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
|
||||
dispatchService.dispatch(jobInfo, instanceId);
|
||||
} else {
|
||||
InstanceTimeWheelService.schedule(instanceId, delay, () -> {
|
||||
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
|
||||
});
|
||||
InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfo, instanceId));
|
||||
}
|
||||
log.info("[Job-{}] run job successfully, params={}, instanceId={}", jobInfo.getId(), instanceParams, instanceId);
|
||||
return instanceId;
|
||||
@ -135,6 +136,7 @@ public class JobService {
|
||||
|
||||
/**
|
||||
* 删除某个任务
|
||||
*
|
||||
* @param jobId 任务ID
|
||||
*/
|
||||
public void deleteJob(Long jobId) {
|
||||
@ -150,10 +152,11 @@ public class JobService {
|
||||
|
||||
/**
|
||||
* 启用某个任务
|
||||
*
|
||||
* @param jobId 任务ID
|
||||
* @throws Exception 异常(CRON表达式错误)
|
||||
* @throws ParseException 异常(CRON表达式错误)
|
||||
*/
|
||||
public void enableJob(Long jobId) throws Exception {
|
||||
public void enableJob(Long jobId) throws ParseException {
|
||||
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
|
||||
|
||||
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
|
||||
@ -166,7 +169,7 @@ public class JobService {
|
||||
* 停止或删除某个JOB
|
||||
* 秒级任务还要额外停止正在运行的任务实例
|
||||
*/
|
||||
private void shutdownOrStopJob(Long jobId, SwitchableStatus status) throws IllegalArgumentException {
|
||||
private void shutdownOrStopJob(Long jobId, SwitchableStatus status) {
|
||||
|
||||
// 1. 先更新 job_info 表
|
||||
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
|
||||
@ -194,11 +197,12 @@ public class JobService {
|
||||
// 重复查询了数据库,不过问题不大,这个调用量很小
|
||||
instanceService.stopInstance(instance.getInstanceId());
|
||||
} catch (Exception ignore) {
|
||||
// ignore exception
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws Exception {
|
||||
private void calculateNextTriggerTime(JobInfoDO jobInfoDO) throws ParseException {
|
||||
// 计算下次调度时间
|
||||
Date now = new Date();
|
||||
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
|
||||
|
@ -22,6 +22,7 @@ import org.springframework.stereotype.Service;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
@ -48,9 +49,15 @@ public class InstanceManager {
|
||||
|
||||
/**
|
||||
* 更新任务状态
|
||||
* ********************************************
|
||||
* 2021-02-03 modify by Echo009
|
||||
* 实例的执行次数统一在这里管理,对于非固定频率的任务
|
||||
* 当 db 中实例的状态为等待派发时,runningTimes + 1
|
||||
* ********************************************
|
||||
*
|
||||
* @param req TaskTracker上报任务实例状态的请求
|
||||
*/
|
||||
public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception {
|
||||
public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {
|
||||
|
||||
Long instanceId = req.getInstanceId();
|
||||
|
||||
@ -66,26 +73,15 @@ public class InstanceManager {
|
||||
log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will be dropped.", instanceId, req);
|
||||
return;
|
||||
}
|
||||
|
||||
// 丢弃非目标 TaskTracker 的上报数据(脑裂情况)
|
||||
if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) {
|
||||
log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will br dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress());
|
||||
return;
|
||||
}
|
||||
|
||||
// 如果任务已经结束,直接丢弃该请求(StopInstance 和 ReportStatus 同时发送的情况)
|
||||
if (InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) {
|
||||
log.info("[InstanceManager-{}] instance already finished, this report[{}] will be dropped!", instanceId, req);
|
||||
return;
|
||||
}
|
||||
|
||||
InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus());
|
||||
InstanceStatus receivedInstanceStatus = InstanceStatus.of(req.getInstanceStatus());
|
||||
Integer timeExpressionType = jobInfo.getTimeExpressionType();
|
||||
|
||||
instanceInfo.setStatus(newStatus.getV());
|
||||
instanceInfo.setLastReportTime(req.getReportTime());
|
||||
instanceInfo.setGmtModified(new Date());
|
||||
|
||||
// FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可
|
||||
// FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行)
|
||||
// 综上,直接把 status 和 runningNum 同步到DB即可
|
||||
@ -96,14 +92,22 @@ public class InstanceManager {
|
||||
instanceInfoRepository.saveAndFlush(instanceInfo);
|
||||
return;
|
||||
}
|
||||
// 更新运行次数
|
||||
if (instanceInfo.getStatus() == InstanceStatus.WAITING_WORKER_RECEIVE.getV()) {
|
||||
// 这里不会存在并发问题
|
||||
instanceInfo.setRunningTimes(instanceInfo.getRunningTimes() + 1);
|
||||
}
|
||||
instanceInfo.setStatus(receivedInstanceStatus.getV());
|
||||
instanceInfo.setLastReportTime(req.getReportTime());
|
||||
instanceInfo.setGmtModified(new Date());
|
||||
|
||||
boolean finished = false;
|
||||
if (newStatus == InstanceStatus.SUCCEED) {
|
||||
if (receivedInstanceStatus == InstanceStatus.SUCCEED) {
|
||||
instanceInfo.setResult(req.getResult());
|
||||
instanceInfo.setFinishedTime(System.currentTimeMillis());
|
||||
|
||||
finished = true;
|
||||
}else if (newStatus == InstanceStatus.FAILED) {
|
||||
} else if (receivedInstanceStatus == InstanceStatus.FAILED) {
|
||||
|
||||
// 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =)
|
||||
if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) {
|
||||
@ -111,9 +115,7 @@ public class InstanceManager {
|
||||
log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes());
|
||||
|
||||
// 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败)
|
||||
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> {
|
||||
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
|
||||
}, 10, TimeUnit.SECONDS);
|
||||
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> dispatchService.redispatch(jobInfo, instanceId), 10, TimeUnit.SECONDS);
|
||||
|
||||
// 修改状态为 等待派发,正式开始重试
|
||||
// 问题:会丢失以往的调度记录(actualTriggerTime什么的都会被覆盖)
|
||||
@ -131,12 +133,13 @@ public class InstanceManager {
|
||||
|
||||
if (finished) {
|
||||
// 这里的 InstanceStatus 只有 成功/失败 两种,手动停止不会由 TaskTracker 上报
|
||||
processFinishedInstance(instanceId, req.getWfInstanceId(), newStatus, req.getResult());
|
||||
processFinishedInstance(instanceId, req.getWfInstanceId(), receivedInstanceStatus, req.getResult());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 收尾完成的任务实例
|
||||
*
|
||||
* @param instanceId 任务实例ID
|
||||
* @param wfInstanceId 工作流实例ID,非必须
|
||||
* @param status 任务状态,有 成功/失败/手动停止
|
||||
|
@ -57,14 +57,20 @@ public class InstanceService {
|
||||
|
||||
/**
|
||||
* 创建任务实例(注意,该方法并不调用 saveAndFlush,如果有需要立即同步到DB的需求,请在方法结束后手动调用 flush)
|
||||
* ********************************************
|
||||
* 2021-02-03 modify by Echo009
|
||||
* 新增 jobParams ,每次均记录任务静态参数
|
||||
* ********************************************
|
||||
*
|
||||
* @param jobId 任务ID
|
||||
* @param appId 所属应用ID
|
||||
* @param instanceParams 任务实例参数,仅 OpenAPI 创建时存在
|
||||
* @param jobParams 任务静态参数
|
||||
* @param instanceParams 任务实例参数,仅 OpenAPI 创建 或者 工作流任务 时存在
|
||||
* @param wfInstanceId 工作流任务实例ID,仅工作流下的任务实例存在
|
||||
* @param expectTriggerTime 预期执行时间
|
||||
* @return 任务实例ID
|
||||
*/
|
||||
public Long create(Long jobId, Long appId, String instanceParams, Long wfInstanceId, Long expectTriggerTime) {
|
||||
public Long create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) {
|
||||
|
||||
Long instanceId = idGenerateService.allocate();
|
||||
Date now = new Date();
|
||||
@ -73,11 +79,13 @@ public class InstanceService {
|
||||
newInstanceInfo.setJobId(jobId);
|
||||
newInstanceInfo.setAppId(appId);
|
||||
newInstanceInfo.setInstanceId(instanceId);
|
||||
newInstanceInfo.setJobParams(jobParams);
|
||||
newInstanceInfo.setInstanceParams(instanceParams);
|
||||
newInstanceInfo.setType(wfInstanceId == null ? InstanceType.NORMAL.getV() : InstanceType.WORKFLOW.getV());
|
||||
newInstanceInfo.setWfInstanceId(wfInstanceId);
|
||||
|
||||
newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
|
||||
newInstanceInfo.setRunningTimes(0L);
|
||||
newInstanceInfo.setExpectedTriggerTime(expectTriggerTime);
|
||||
newInstanceInfo.setLastReportTime(-1L);
|
||||
newInstanceInfo.setGmtCreate(now);
|
||||
@ -89,6 +97,7 @@ public class InstanceService {
|
||||
|
||||
/**
|
||||
* 停止任务实例
|
||||
*
|
||||
* @param instanceId 任务实例ID
|
||||
*/
|
||||
public void stopInstance(Long instanceId) {
|
||||
@ -135,6 +144,7 @@ public class InstanceService {
|
||||
|
||||
/**
|
||||
* 重试任务(只有结束的任务运行重试)
|
||||
*
|
||||
* @param instanceId 任务实例ID
|
||||
*/
|
||||
@DesignateServer(appIdParameterName = "appId")
|
||||
@ -162,12 +172,13 @@ public class InstanceService {
|
||||
// 派发任务
|
||||
Long jobId = instanceInfo.getJobId();
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId));
|
||||
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
|
||||
dispatchService.redispatch(jobInfo, instanceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消任务实例的运行
|
||||
* 接口使用条件:调用接口时间与待取消任务的预计执行时间有一定时间间隔,否则不保证可靠性!
|
||||
*
|
||||
* @param instanceId 任务实例
|
||||
*/
|
||||
public void cancelInstance(Long instanceId) {
|
||||
@ -215,6 +226,7 @@ public class InstanceService {
|
||||
|
||||
/**
|
||||
* 获取任务实例的信息
|
||||
*
|
||||
* @param instanceId 任务实例ID
|
||||
* @return 任务实例的信息
|
||||
*/
|
||||
@ -224,6 +236,7 @@ public class InstanceService {
|
||||
|
||||
/**
|
||||
* 获取任务实例的状态
|
||||
*
|
||||
* @param instanceId 任务实例ID
|
||||
* @return 任务实例的状态
|
||||
*/
|
||||
@ -234,6 +247,7 @@ public class InstanceService {
|
||||
|
||||
/**
|
||||
* 获取任务实例的详细运行详细
|
||||
*
|
||||
* @param instanceId 任务实例ID
|
||||
* @return 详细运行状态
|
||||
*/
|
||||
|
@ -83,15 +83,26 @@ public class InstanceStatusCheckService {
|
||||
|
||||
/**
|
||||
* 检查任务实例的状态,发现异常及时重试,包括
|
||||
* WAITING_DISPATCH 超时:写入时间伦但为调度前 server down
|
||||
* WAITING_DISPATCH 超时:写入时间轮但未调度前 server down
|
||||
* WAITING_WORKER_RECEIVE 超时:由于网络错误导致 worker 未接受成功
|
||||
* RUNNING 超时:TaskTracker down,断开与 server 的心跳连接
|
||||
*
|
||||
* @param allAppIds 本系统所承担的所有 appIds
|
||||
*/
|
||||
private void checkInstance(List<Long> allAppIds) {
|
||||
|
||||
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
|
||||
// 1. 检查等待 WAITING_DISPATCH 状态的任务
|
||||
handleWaitingDispatchInstance(partAppIds);
|
||||
// 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
|
||||
handleWaitingWorkerReceiveInstance(partAppIds);
|
||||
// 3. 检查 RUNNING 状态的任务(一定时间内没收到 TaskTracker 的状态报告,视为失败)
|
||||
handleRunningInstance(partAppIds);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private void handleWaitingDispatchInstance(List<Long> partAppIds) {
|
||||
// 1. 检查等待 WAITING_DISPATCH 状态的任务
|
||||
long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
|
||||
List<InstanceInfoDO> waitingDispatchInstances = instanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
|
||||
@ -107,28 +118,32 @@ public class InstanceStatusCheckService {
|
||||
|
||||
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(instance.getJobId());
|
||||
if (jobInfoOpt.isPresent()) {
|
||||
dispatchService.redispatch(jobInfoOpt.get(), instance.getInstanceId(), 0);
|
||||
dispatchService.redispatch(jobInfoOpt.get(), instance.getInstanceId());
|
||||
} else {
|
||||
log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance);
|
||||
updateFailedInstance(instance, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void handleWaitingWorkerReceiveInstance(List<Long> partAppIds) {
|
||||
// 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
|
||||
threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
|
||||
long threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
|
||||
List<InstanceInfoDO> waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
|
||||
if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) {
|
||||
log.warn("[InstanceStatusChecker] find one instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances);
|
||||
waitingWorkerReceiveInstances.forEach(instance -> {
|
||||
// 重新派发
|
||||
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);
|
||||
dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), 0);
|
||||
dispatchService.redispatch(jobInfoDO, instance.getInstanceId());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRunningInstance(List<Long> partAppIds) {
|
||||
// 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败)
|
||||
threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
|
||||
long threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
|
||||
List<InstanceInfoDO> failedInstances = instanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
|
||||
if (!CollectionUtils.isEmpty(failedInstances)) {
|
||||
log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances);
|
||||
@ -146,19 +161,19 @@ public class InstanceStatusCheckService {
|
||||
|
||||
// CRON 和 API一样,失败次数 + 1,根据重试配置进行重试
|
||||
if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) {
|
||||
dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
|
||||
dispatchService.redispatch(jobInfoDO, instance.getInstanceId());
|
||||
} else {
|
||||
updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 定期检查工作流实例状态
|
||||
* 此处仅检查并重试长时间处于 WAITING 状态的工作流实例,工作流的其他可靠性由 Instance 支撑,即子任务失败会反馈会 WorkflowInstance
|
||||
*
|
||||
* @param allAppIds 本系统所承担的所有 appIds
|
||||
*/
|
||||
private void checkWorkflowInstance(List<Long> allAppIds) {
|
||||
@ -175,7 +190,7 @@ public class InstanceStatusCheckService {
|
||||
waitingWfInstanceList.forEach(wfInstance -> {
|
||||
Optional<WorkflowInfoDO> workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId());
|
||||
workflowOpt.ifPresent(workflowInfo -> {
|
||||
workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId(), wfInstance.getWfInitParams());
|
||||
workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId());
|
||||
log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId());
|
||||
});
|
||||
});
|
||||
@ -184,7 +199,7 @@ public class InstanceStatusCheckService {
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理上报超时而失败的任务实例
|
||||
* 处理失败的任务实例
|
||||
*/
|
||||
private void updateFailedInstance(InstanceInfoDO instance, String result) {
|
||||
|
||||
@ -196,6 +211,6 @@ public class InstanceStatusCheckService {
|
||||
instance.setResult(result);
|
||||
instanceInfoRepository.saveAndFlush(instance);
|
||||
|
||||
instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, SystemInstanceResult.REPORT_TIMEOUT);
|
||||
instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, result);
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.text.ParseException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -45,7 +46,9 @@ import java.util.stream.Collectors;
|
||||
@Service
|
||||
public class OmsScheduleService {
|
||||
|
||||
// 每次并发调度的应用数量
|
||||
/**
|
||||
* 每次并发调度的应用数量
|
||||
*/
|
||||
private static final int MAX_APP_NUM = 10;
|
||||
|
||||
@Resource
|
||||
@ -142,7 +145,7 @@ public class OmsScheduleService {
|
||||
log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos);
|
||||
|
||||
jobInfos.forEach(jobInfo -> {
|
||||
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), null, null, jobInfo.getNextTriggerTime());
|
||||
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime());
|
||||
jobId2InstanceId.put(jobInfo.getId(), instanceId);
|
||||
});
|
||||
instanceInfoRepository.flush();
|
||||
@ -160,9 +163,7 @@ public class OmsScheduleService {
|
||||
delay = targetTriggerTime - nowTime;
|
||||
}
|
||||
|
||||
InstanceTimeWheelService.schedule(instanceId, delay, () -> {
|
||||
dispatchService.dispatch(jobInfoDO, instanceId, 0, null, null);
|
||||
});
|
||||
InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId));
|
||||
});
|
||||
|
||||
// 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms)
|
||||
@ -204,7 +205,7 @@ public class OmsScheduleService {
|
||||
log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis());
|
||||
delay = 0;
|
||||
}
|
||||
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, null));
|
||||
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId));
|
||||
|
||||
// 3. 重新计算下一次调度时间并更新
|
||||
try {
|
||||
@ -252,7 +253,7 @@ public class OmsScheduleService {
|
||||
});
|
||||
}
|
||||
|
||||
private void refreshJob(JobInfoDO jobInfo) throws Exception {
|
||||
private void refreshJob(JobInfoDO jobInfo) throws ParseException {
|
||||
Date nextTriggerTime = calculateNextTriggerTime(jobInfo.getNextTriggerTime(), jobInfo.getTimeExpression());
|
||||
|
||||
JobInfoDO updatedJobInfo = new JobInfoDO();
|
||||
@ -269,7 +270,7 @@ public class OmsScheduleService {
|
||||
jobInfoRepository.save(updatedJobInfo);
|
||||
}
|
||||
|
||||
private void refreshWorkflow(WorkflowInfoDO wfInfo) throws Exception {
|
||||
private void refreshWorkflow(WorkflowInfoDO wfInfo) throws ParseException {
|
||||
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
|
||||
|
||||
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
|
||||
@ -288,12 +289,13 @@ public class OmsScheduleService {
|
||||
|
||||
/**
|
||||
* 计算下次触发时间
|
||||
*
|
||||
* @param preTriggerTime 前一次触发时间
|
||||
* @param cronExpression CRON 表达式
|
||||
* @return 下一次调度时间
|
||||
* @throws Exception 异常
|
||||
* @throws ParseException 异常
|
||||
*/
|
||||
private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws Exception {
|
||||
private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws ParseException {
|
||||
|
||||
CronExpression ce = new CronExpression(cronExpression);
|
||||
// 取最大值,防止长时间未调度任务被连续调度(原来DISABLE的任务突然被打开,不取最大值会补上过去所有的调度)
|
||||
|
@ -1,10 +1,7 @@
|
||||
package com.github.kfcfans.powerjob.server.service.workflow;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.github.kfcfans.powerjob.common.*;
|
||||
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
|
||||
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
||||
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
|
||||
@ -23,19 +20,13 @@ import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm;
|
||||
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
|
||||
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
|
||||
import com.google.common.collect.LinkedListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.*;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 管理运行中的工作流实例
|
||||
@ -66,6 +57,11 @@ public class WorkflowInstanceManager {
|
||||
|
||||
/**
|
||||
* 创建工作流任务实例
|
||||
* ********************************************
|
||||
* 2021-02-03 modify by Echo009
|
||||
* 通过 initParams 初始化工作流上下文(wfContext)
|
||||
* ********************************************
|
||||
*
|
||||
* @param wfInfo 工作流任务元数据(描述信息)
|
||||
* @param initParams 启动参数
|
||||
* @param expectTriggerTime 预计执行时间
|
||||
@ -86,13 +82,17 @@ public class WorkflowInstanceManager {
|
||||
newWfInstance.setExpectedTriggerTime(expectTriggerTime);
|
||||
newWfInstance.setActualTriggerTime(System.currentTimeMillis());
|
||||
newWfInstance.setWfInitParams(initParams);
|
||||
// 初始化上下文
|
||||
Map<String, String> wfContextMap = Maps.newHashMap();
|
||||
wfContextMap.put(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, initParams);
|
||||
newWfInstance.setWfContext(JsonUtils.toJSONString(wfContextMap));
|
||||
|
||||
newWfInstance.setGmtCreate(now);
|
||||
newWfInstance.setGmtModified(now);
|
||||
|
||||
// 校验合法性(工作是否存在且启用)
|
||||
List<Long> allJobIds = Lists.newLinkedList();
|
||||
PEWorkflowDAG dag = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
Set<Long> allJobIds = Sets.newHashSet();
|
||||
PEWorkflowDAG dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
dag.getNodes().forEach(node -> allJobIds.add(node.getJobId()));
|
||||
int needNum = allJobIds.size();
|
||||
long dbNum = jobInfoRepository.countByAppIdAndStatusAndIdIn(wfInfo.getAppId(), SwitchableStatus.ENABLE.getV(), allJobIds);
|
||||
@ -102,18 +102,24 @@ public class WorkflowInstanceManager {
|
||||
log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum);
|
||||
onWorkflowInstanceFailed(SystemInstanceResult.CAN_NOT_FIND_JOB, newWfInstance);
|
||||
} else {
|
||||
workflowInstanceInfoRepository.save(newWfInstance);
|
||||
workflowInstanceInfoRepository.saveAndFlush(newWfInstance);
|
||||
}
|
||||
return wfInstanceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 开始任务
|
||||
* ********************************************
|
||||
* 2021-02-03 modify by Echo009
|
||||
* 1、工作流支持配置重复的任务节点
|
||||
* 2、移除参数 initParams,改为统一从工作流实例中获取
|
||||
* 传递工作流实例的 wfContext 作为 初始启动参数
|
||||
* ********************************************
|
||||
*
|
||||
* @param wfInfo 工作流任务信息
|
||||
* @param wfInstanceId 工作流任务实例ID
|
||||
* @param initParams 启动参数
|
||||
*/
|
||||
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId, String initParams) {
|
||||
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {
|
||||
|
||||
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
|
||||
if (!wfInstanceInfoOpt.isPresent()) {
|
||||
@ -124,7 +130,7 @@ public class WorkflowInstanceManager {
|
||||
|
||||
// 不是等待中,不再继续执行(可能上一流程已经失败)
|
||||
if (wfInstanceInfo.getStatus() != WorkflowInstanceStatus.WAITING.getV()) {
|
||||
log.info("[Workflow-{}|{}] workflowInstance({}) need't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
|
||||
log.info("[Workflow-{}|{}] workflowInstance({}) needn't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -137,19 +143,23 @@ public class WorkflowInstanceManager {
|
||||
|
||||
try {
|
||||
|
||||
// 构建根任务启动参数(为了精简 worker 端实现,启动参数仍以 instanceParams 字段承接)
|
||||
Map<String, String> preJobId2Result = Maps.newHashMap();
|
||||
// 模拟 preJobId -> preJobResult 的格式,-1 代表前置任务不存在
|
||||
preJobId2Result.put("-1", initParams);
|
||||
String wfRootInstanceParams = JSONObject.toJSONString(preJobId2Result);
|
||||
|
||||
PEWorkflowDAG peWorkflowDAG = JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
PEWorkflowDAG peWorkflowDAG = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||
List<PEWorkflowDAG.Node> roots = WorkflowDAGUtils.listRoots(peWorkflowDAG);
|
||||
|
||||
peWorkflowDAG.getNodes().forEach(node -> node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()));
|
||||
Map<Long, JobInfoDO> nodeId2JobInfoMap = Maps.newHashMap();
|
||||
// 创建所有的根任务
|
||||
roots.forEach(root -> {
|
||||
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), wfRootInstanceParams, wfInstanceId, System.currentTimeMillis());
|
||||
// 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示
|
||||
// 如果 job 信息缺失,在 dispatch 的时候会失败,继而使整个工作流失败
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(root.getJobId()).orElseGet(JobInfoDO::new);
|
||||
if (jobInfo.getId() == null) {
|
||||
// 在创建工作流实例到当前的这段时间内 job 信息被物理删除了
|
||||
log.error("[Workflow-{}|{}]job info of current node{nodeId={},jobId={}} is not present! maybe you have deleted the job!", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId());
|
||||
}
|
||||
nodeId2JobInfoMap.put(root.getNodeId(), jobInfo);
|
||||
// instanceParam 传递的是工作流实例的 wfContext
|
||||
Long instanceId = instanceService.create(root.getJobId(), wfInfo.getAppId(), jobInfo.getJobParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis());
|
||||
root.setInstanceId(instanceId);
|
||||
root.setStatus(InstanceStatus.RUNNING.getV());
|
||||
|
||||
@ -158,12 +168,12 @@ public class WorkflowInstanceManager {
|
||||
|
||||
// 持久化
|
||||
wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
|
||||
wfInstanceInfo.setDag(JSONObject.toJSONString(peWorkflowDAG));
|
||||
wfInstanceInfo.setDag(JSON.toJSONString(peWorkflowDAG));
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
|
||||
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
|
||||
|
||||
// 真正开始执行根任务
|
||||
roots.forEach(root -> runInstance(root.getJobId(), root.getInstanceId(), wfInstanceId, wfRootInstanceParams));
|
||||
roots.forEach(root -> runInstance(nodeId2JobInfoMap.get(root.getNodeId()), root.getInstanceId()));
|
||||
} catch (Exception e) {
|
||||
|
||||
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
|
||||
@ -173,11 +183,19 @@ public class WorkflowInstanceManager {
|
||||
|
||||
/**
|
||||
* 下一步(当工作流的某个任务完成时调用该方法)
|
||||
* ********************************************
|
||||
* 2021-02-03 modify by Echo009
|
||||
* 1、工作流支持配置重复的任务节点
|
||||
* 2、不再获取上游任务的结果作为实例参数而是传递工作流
|
||||
* 实例的 wfContext 作为 实例参数
|
||||
* ********************************************
|
||||
*
|
||||
* @param wfInstanceId 工作流任务实例ID
|
||||
* @param instanceId 具体完成任务的某个任务实例ID
|
||||
* @param status 完成任务的任务实例状态(SUCCEED/FAILED/STOPPED)
|
||||
* @param result 完成任务的任务实例结果
|
||||
*/
|
||||
@SuppressWarnings({"squid:S3776","squid:S2142","squid:S1141"})
|
||||
public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) {
|
||||
|
||||
int lockId = wfInstanceId.hashCode();
|
||||
@ -192,18 +210,16 @@ public class WorkflowInstanceManager {
|
||||
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get();
|
||||
Long wfId = wfInstance.getWorkflowId();
|
||||
|
||||
// 特殊处理手动终止的情况
|
||||
if (status == InstanceStatus.STOPPED) {
|
||||
// 工作流已经不在运行状态了(由用户手动停止工作流实例导致),不需要任何操作
|
||||
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
// 特殊处理手动终止 且 工作流实例已经不在运行状态的情况
|
||||
if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
// 由用户手动停止工作流实例导致,不需要任何操作
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
PEWorkflowDAG dag = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
||||
// 保存 jobId -> Node 的映射关系(一个job只能出现一次的原因)
|
||||
Map<Long, PEWorkflowDAG.Node> jobId2Node = Maps.newHashMap();
|
||||
PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
||||
// 保存 nodeId -> Node 的映射关系
|
||||
Map<Long, PEWorkflowDAG.Node> nodeId2Node = Maps.newHashMap();
|
||||
|
||||
// 更新完成节点状态
|
||||
boolean allFinished = true;
|
||||
@ -218,11 +234,11 @@ public class WorkflowInstanceManager {
|
||||
if (InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) {
|
||||
allFinished = false;
|
||||
}
|
||||
jobId2Node.put(node.getJobId(), node);
|
||||
nodeId2Node.put(node.getNodeId(), node);
|
||||
}
|
||||
|
||||
wfInstance.setGmtModified(new Date());
|
||||
wfInstance.setDag(JSONObject.toJSONString(dag));
|
||||
wfInstance.setDag(JSON.toJSONString(dag));
|
||||
// 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图
|
||||
if (!WorkflowInstanceStatus.generalizedRunningStatus.contains(wfInstance.getStatus())) {
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
@ -265,42 +281,39 @@ public class WorkflowInstanceManager {
|
||||
dag.getEdges().forEach(edge -> relyMap.put(edge.getTo(), edge.getFrom()));
|
||||
|
||||
// 重新计算需要派发的任务
|
||||
Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
|
||||
Map<Long, String> jobId2InstanceParams = Maps.newHashMap();
|
||||
|
||||
relyMap.keySet().forEach(jobId -> {
|
||||
|
||||
List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
|
||||
Map<Long, JobInfoDO> nodeId2JobInfoMap = Maps.newHashMap();
|
||||
relyMap.keySet().forEach(nodeId -> {
|
||||
PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId);
|
||||
// 跳过已完成节点(理论上此处不可能出现 FAILED 的情况)和已派发节点(存在 InstanceId)
|
||||
if (jobId2Node.get(jobId).getStatus() == InstanceStatus.SUCCEED.getV() || jobId2Node.get(jobId).getInstanceId() != null) {
|
||||
if (currentNode.getStatus() == InstanceStatus.SUCCEED.getV() || currentNode.getInstanceId() != null) {
|
||||
return;
|
||||
}
|
||||
// 判断某个任务所有依赖的完成情况,只要有一个未成功,即无法执行
|
||||
for (Long reliedJobId : relyMap.get(jobId)) {
|
||||
if (jobId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) {
|
||||
for (Long reliedJobId : relyMap.get(nodeId)) {
|
||||
if (nodeId2Node.get(reliedJobId).getStatus() != InstanceStatus.SUCCEED.getV()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 所有依赖已经执行完毕,可以执行该任务 (为什么是 Key 是 String?在 JSON 标准中,key必须由双引号引起来,Long会导致结果无法被反序列化)
|
||||
Map<String, String> preJobId2Result = Maps.newHashMap();
|
||||
// 构建下一个任务的入参 (前置任务 jobId -> result)
|
||||
relyMap.get(jobId).forEach(jid -> preJobId2Result.put(String.valueOf(jid), jobId2Node.get(jid).getResult()));
|
||||
|
||||
Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JsonUtils.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis());
|
||||
jobId2Node.get(jobId).setInstanceId(newInstanceId);
|
||||
jobId2Node.get(jobId).setStatus(InstanceStatus.RUNNING.getV());
|
||||
|
||||
jobId2InstanceId.put(jobId, newInstanceId);
|
||||
jobId2InstanceParams.put(jobId, JSONObject.toJSONString(preJobId2Result));
|
||||
|
||||
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId);
|
||||
// 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(currentNode.getJobId()).orElseGet(JobInfoDO::new);
|
||||
if (jobInfo.getId() == null) {
|
||||
// 在创建工作流实例到当前的这段时间内 job 信息被物理删除了
|
||||
log.error("[Workflow-{}|{}]job info of current node{nodeId={},jobId={}} is not present! maybe you have deleted the job!", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId());
|
||||
}
|
||||
nodeId2JobInfoMap.put(nodeId, jobInfo);
|
||||
// instanceParam 传递的是工作流实例的 wfContext
|
||||
Long newInstanceId = instanceService.create(jobInfo.getId(), wfInstance.getAppId(), jobInfo.getJobParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis());
|
||||
currentNode.setInstanceId(newInstanceId);
|
||||
currentNode.setStatus(InstanceStatus.RUNNING.getV());
|
||||
readyNodes.add(currentNode);
|
||||
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId(), newInstanceId);
|
||||
});
|
||||
|
||||
wfInstance.setDag(JSONObject.toJSONString(dag));
|
||||
wfInstance.setDag(JSON.toJSONString(dag));
|
||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||
|
||||
// 持久化结束后,开始调度执行所有的任务
|
||||
jobId2InstanceId.forEach((jobId, newInstanceId) -> runInstance(jobId, newInstanceId, wfInstanceId, jobId2InstanceParams.get(jobId)));
|
||||
readyNodes.forEach(node -> runInstance(nodeId2JobInfoMap.get(node.getNodeId()), node.getInstanceId()));
|
||||
|
||||
} catch (Exception e) {
|
||||
onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance);
|
||||
@ -308,24 +321,24 @@ public class WorkflowInstanceManager {
|
||||
}
|
||||
|
||||
} catch (InterruptedException ignore) {
|
||||
// ignore
|
||||
} finally {
|
||||
segmentLock.unlock(lockId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 运行任务实例
|
||||
* 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题
|
||||
* @param jobId 任务ID
|
||||
*
|
||||
* @param jobInfo 任务信息
|
||||
* @param instanceId 任务实例ID
|
||||
* @param wfInstanceId 工作流任务实例ID
|
||||
* @param instanceParams 任务实例参数,值为上游任务的执行结果: preJobId to preJobInstanceResult
|
||||
*/
|
||||
private void runInstance(Long jobId, Long instanceId, Long wfInstanceId, String instanceParams) {
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
|
||||
private void runInstance(JobInfoDO jobInfo, Long instanceId) {
|
||||
// 洗去时间表达式类型
|
||||
jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV());
|
||||
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, wfInstanceId);
|
||||
dispatchService.dispatch(jobInfo, instanceId);
|
||||
}
|
||||
|
||||
private void onWorkflowInstanceFailed(String result, WorkflowInstanceInfoDO wfInstance) {
|
||||
@ -350,6 +363,7 @@ public class WorkflowInstanceManager {
|
||||
AlarmCenter.alarmFailed(content, userList);
|
||||
});
|
||||
} catch (Exception ignore) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
package com.github.kfcfans.powerjob.server.service.workflow;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
|
||||
@ -18,6 +18,7 @@ import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.text.ParseException;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
@ -39,9 +40,9 @@ public class WorkflowService {
|
||||
* 保存/修改DAG工作流
|
||||
* @param req 请求
|
||||
* @return 工作流ID
|
||||
* @throws Exception 异常
|
||||
* @throws ParseException 异常
|
||||
*/
|
||||
public Long saveWorkflow(SaveWorkflowRequest req) throws Exception {
|
||||
public Long saveWorkflow(SaveWorkflowRequest req) throws ParseException {
|
||||
|
||||
req.valid();
|
||||
|
||||
@ -60,7 +61,7 @@ public class WorkflowService {
|
||||
|
||||
BeanUtils.copyProperties(req, wf);
|
||||
wf.setGmtModified(new Date());
|
||||
wf.setPeDAG(JSONObject.toJSONString(req.getPEWorkflowDAG()));
|
||||
wf.setPeDAG(JSON.toJSONString(req.getPEWorkflowDAG()));
|
||||
wf.setStatus(req.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
|
||||
wf.setTimeExpressionType(req.getTimeExpressionType().getV());
|
||||
|
||||
@ -147,9 +148,9 @@ public class WorkflowService {
|
||||
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
|
||||
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay);
|
||||
if (delay <= 0) {
|
||||
workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
|
||||
workflowInstanceManager.start(wfInfo, wfInstanceId);
|
||||
}else {
|
||||
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId, initParams));
|
||||
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId));
|
||||
}
|
||||
return wfInstanceId;
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ public class RepositoryTest {
|
||||
@Test
|
||||
@Transactional
|
||||
public void testExecuteLogUpdate() {
|
||||
instanceInfoRepository.update4TriggerFailed(1586310414570L, 2, 100, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL", "", new Date());
|
||||
instanceInfoRepository.update4TriggerFailed(1586310414570L, 2, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL", new Date());
|
||||
instanceInfoRepository.update4FrequentJob(1586310419650L, 2, 200, new Date());
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user