mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
fix: concurrency problem when process workflow instance
This commit is contained in:
parent
e6127f1dba
commit
afff77b540
@ -55,24 +55,36 @@ public class InstanceLogService {
|
|||||||
private InstanceMetadataService instanceMetadataService;
|
private InstanceMetadataService instanceMetadataService;
|
||||||
@Resource
|
@Resource
|
||||||
private GridFsManager gridFsManager;
|
private GridFsManager gridFsManager;
|
||||||
// 本地数据库操作bean
|
/**
|
||||||
|
* 本地数据库操作bean
|
||||||
|
*/
|
||||||
@Resource(name = "localTransactionTemplate")
|
@Resource(name = "localTransactionTemplate")
|
||||||
private TransactionTemplate localTransactionTemplate;
|
private TransactionTemplate localTransactionTemplate;
|
||||||
@Resource
|
@Resource
|
||||||
private LocalInstanceLogRepository localInstanceLogRepository;
|
private LocalInstanceLogRepository localInstanceLogRepository;
|
||||||
|
|
||||||
// 本地维护了在线日志的任务实例ID
|
/**
|
||||||
|
* 本地维护了在线日志的任务实例ID
|
||||||
|
*/
|
||||||
private final Map<Long, Long> instanceId2LastReportTime = Maps.newConcurrentMap();
|
private final Map<Long, Long> instanceId2LastReportTime = Maps.newConcurrentMap();
|
||||||
private final ExecutorService workerPool;
|
private final ExecutorService workerPool;
|
||||||
|
|
||||||
// 分段锁
|
/**
|
||||||
|
* 分段锁
|
||||||
|
*/
|
||||||
private final SegmentLock segmentLock = new SegmentLock(8);
|
private final SegmentLock segmentLock = new SegmentLock(8);
|
||||||
|
|
||||||
// 格式化时间戳
|
/**
|
||||||
private static final FastDateFormat dateFormat = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS);
|
* 格式化时间戳
|
||||||
// 每一个展示的行数
|
*/
|
||||||
|
private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS);
|
||||||
|
/**
|
||||||
|
* 每一个展示的行数
|
||||||
|
*/
|
||||||
private static final int MAX_LINE_COUNT = 100;
|
private static final int MAX_LINE_COUNT = 100;
|
||||||
// 过期时间
|
/**
|
||||||
|
* 过期时间
|
||||||
|
*/
|
||||||
private static final long EXPIRE_INTERVAL_MS = 60000;
|
private static final long EXPIRE_INTERVAL_MS = 60000;
|
||||||
|
|
||||||
public InstanceLogService() {
|
public InstanceLogService() {
|
||||||
@ -110,7 +122,7 @@ public class InstanceLogService {
|
|||||||
* @param index 页码,从0开始
|
* @param index 页码,从0开始
|
||||||
* @return 文本字符串
|
* @return 文本字符串
|
||||||
*/
|
*/
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) {
|
public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) {
|
||||||
try {
|
try {
|
||||||
Future<File> fileFuture = prepareLogFile(instanceId);
|
Future<File> fileFuture = prepareLogFile(instanceId);
|
||||||
@ -154,7 +166,7 @@ public class InstanceLogService {
|
|||||||
* @param instanceId 任务实例 ID
|
* @param instanceId 任务实例 ID
|
||||||
* @return 下载链接
|
* @return 下载链接
|
||||||
*/
|
*/
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
public String fetchDownloadUrl(Long appId, Long instanceId) {
|
public String fetchDownloadUrl(Long appId, Long instanceId) {
|
||||||
String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
|
String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
|
||||||
log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
|
log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
|
||||||
@ -326,7 +338,7 @@ public class InstanceLogService {
|
|||||||
*/
|
*/
|
||||||
private static String convertLog(LocalInstanceLogDO instanceLog) {
|
private static String convertLog(LocalInstanceLogDO instanceLog) {
|
||||||
return String.format("%s [%s] %s %s",
|
return String.format("%s [%s] %s %s",
|
||||||
dateFormat.format(instanceLog.getLogTime()),
|
DATE_FORMAT.format(instanceLog.getLogTime()),
|
||||||
instanceLog.getWorkerAddress(),
|
instanceLog.getWorkerAddress(),
|
||||||
LogLevel.genLogLevelString(instanceLog.getLogLevel()),
|
LogLevel.genLogLevelString(instanceLog.getLogLevel()),
|
||||||
instanceLog.getLogContent());
|
instanceLog.getLogContent());
|
||||||
|
@ -104,7 +104,7 @@ public class InstanceService {
|
|||||||
*
|
*
|
||||||
* @param instanceId 任务实例ID
|
* @param instanceId 任务实例ID
|
||||||
*/
|
*/
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
public void stopInstance(Long appId,Long instanceId) {
|
public void stopInstance(Long appId,Long instanceId) {
|
||||||
|
|
||||||
log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId);
|
log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId);
|
||||||
@ -152,7 +152,7 @@ public class InstanceService {
|
|||||||
*
|
*
|
||||||
* @param instanceId 任务实例ID
|
* @param instanceId 任务实例ID
|
||||||
*/
|
*/
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
public void retryInstance(Long appId, Long instanceId) {
|
public void retryInstance(Long appId, Long instanceId) {
|
||||||
|
|
||||||
log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId);
|
log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId);
|
||||||
@ -186,6 +186,7 @@ public class InstanceService {
|
|||||||
*
|
*
|
||||||
* @param instanceId 任务实例
|
* @param instanceId 任务实例
|
||||||
*/
|
*/
|
||||||
|
@DesignateServer
|
||||||
public void cancelInstance(Long instanceId) {
|
public void cancelInstance(Long instanceId) {
|
||||||
log.info("[Instance-{}] try to cancel the instance.", instanceId);
|
log.info("[Instance-{}] try to cancel the instance.", instanceId);
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package tech.powerjob.server.core.lock;
|
package tech.powerjob.server.core.lock;
|
||||||
|
|
||||||
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
|
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
import tech.powerjob.server.common.utils.AOPUtils;
|
import tech.powerjob.server.common.utils.AOPUtils;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -20,6 +21,7 @@ import java.util.Map;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
@Aspect
|
@Aspect
|
||||||
@Component
|
@Component
|
||||||
|
@Order(1)
|
||||||
public class UseSegmentLockAspect {
|
public class UseSegmentLockAspect {
|
||||||
|
|
||||||
private final Map<String, SegmentLock> lockStore = Maps.newConcurrentMap();
|
private final Map<String, SegmentLock> lockStore = Maps.newConcurrentMap();
|
||||||
|
@ -145,7 +145,7 @@ public class JobService {
|
|||||||
* @param delay 延迟时间,单位 毫秒
|
* @param delay 延迟时间,单位 毫秒
|
||||||
* @return 任务实例ID
|
* @return 任务实例ID
|
||||||
*/
|
*/
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
public long runJob(Long appId, Long jobId, String instanceParams, Long delay) {
|
public long runJob(Long appId, Long jobId, String instanceParams, Long delay) {
|
||||||
|
|
||||||
delay = delay == null ? 0 : delay;
|
delay = delay == null ? 0 : delay;
|
||||||
|
@ -5,7 +5,6 @@ import com.alibaba.fastjson.TypeReference;
|
|||||||
import com.github.kfcfans.powerjob.common.*;
|
import com.github.kfcfans.powerjob.common.*;
|
||||||
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
|
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
|
||||||
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
||||||
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
|
|
||||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||||
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
|
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
|
||||||
import tech.powerjob.server.persistence.remote.model.*;
|
import tech.powerjob.server.persistence.remote.model.*;
|
||||||
@ -63,8 +62,6 @@ public class WorkflowInstanceManager {
|
|||||||
@Resource
|
@Resource
|
||||||
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||||
|
|
||||||
private final SegmentLock segmentLock = new SegmentLock(16);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建工作流任务实例
|
* 创建工作流任务实例
|
||||||
* ********************************************
|
* ********************************************
|
||||||
@ -101,7 +98,7 @@ public class WorkflowInstanceManager {
|
|||||||
newWfInstance.setGmtModified(now);
|
newWfInstance.setGmtModified(now);
|
||||||
|
|
||||||
// 校验 DAG 信息
|
// 校验 DAG 信息
|
||||||
PEWorkflowDAG dag = null;
|
PEWorkflowDAG dag;
|
||||||
try {
|
try {
|
||||||
dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
|
||||||
// 校验
|
// 校验
|
||||||
@ -151,6 +148,7 @@ public class WorkflowInstanceManager {
|
|||||||
* @param wfInfo 工作流任务信息
|
* @param wfInfo 工作流任务信息
|
||||||
* @param wfInstanceId 工作流任务实例ID
|
* @param wfInstanceId 工作流任务实例ID
|
||||||
*/
|
*/
|
||||||
|
@UseSegmentLock(type = "startWfInstance", key = "#wfInfo.getId().intValue()", concurrencyLevel = 1024)
|
||||||
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {
|
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {
|
||||||
|
|
||||||
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
|
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
|
||||||
@ -169,7 +167,7 @@ public class WorkflowInstanceManager {
|
|||||||
if (wfInfo.getMaxWfInstanceNum() > 0) {
|
if (wfInfo.getMaxWfInstanceNum() > 0) {
|
||||||
// 并发度控制
|
// 并发度控制
|
||||||
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS);
|
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS);
|
||||||
if ( instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
|
if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
|
||||||
onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
|
onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -230,128 +228,121 @@ public class WorkflowInstanceManager {
|
|||||||
* @param result 完成任务的任务实例结果
|
* @param result 完成任务的任务实例结果
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings({"squid:S3776", "squid:S2142", "squid:S1141"})
|
@SuppressWarnings({"squid:S3776", "squid:S2142", "squid:S1141"})
|
||||||
|
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
|
||||||
public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) {
|
public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) {
|
||||||
|
|
||||||
int lockId = wfInstanceId.hashCode();
|
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
|
||||||
try {
|
if (!wfInstanceInfoOpt.isPresent()) {
|
||||||
segmentLock.lockInterruptible(lockId);
|
log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId);
|
||||||
|
return;
|
||||||
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
|
|
||||||
if (!wfInstanceInfoOpt.isPresent()) {
|
|
||||||
log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get();
|
|
||||||
Long wfId = wfInstance.getWorkflowId();
|
|
||||||
|
|
||||||
// 特殊处理手动终止 且 工作流实例已经不在运行状态的情况
|
|
||||||
if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
|
||||||
// 由用户手动停止工作流实例导致,不需要任何操作
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
|
||||||
// 更新完成节点状态
|
|
||||||
boolean allFinished = true;
|
|
||||||
PEWorkflowDAG.Node instanceNode = null;
|
|
||||||
for (PEWorkflowDAG.Node node : dag.getNodes()) {
|
|
||||||
if (instanceId.equals(node.getInstanceId())) {
|
|
||||||
node.setStatus(status.getV());
|
|
||||||
node.setResult(result);
|
|
||||||
instanceNode = node;
|
|
||||||
log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result);
|
|
||||||
}
|
|
||||||
if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {
|
|
||||||
allFinished = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (instanceNode == null) {
|
|
||||||
// DAG 中的节点实例已经被覆盖(原地重试,生成了新的实例信息),直接忽略
|
|
||||||
log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", wfId, wfInstanceId, instanceId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
wfInstance.setGmtModified(new Date());
|
|
||||||
wfInstance.setDag(JSON.toJSONString(dag));
|
|
||||||
// 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图
|
|
||||||
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
|
||||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
|
||||||
log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 任务失败 && 不允许失败跳过,DAG流程被打断,整体失败
|
|
||||||
if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) {
|
|
||||||
log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
|
|
||||||
onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 子任务被手动停止
|
|
||||||
if (status == InstanceStatus.STOPPED) {
|
|
||||||
wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
|
|
||||||
wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_STOPPED);
|
|
||||||
wfInstance.setFinishedTime(System.currentTimeMillis());
|
|
||||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
|
||||||
|
|
||||||
log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 注意:这里会直接跳过 disable 的节点
|
|
||||||
List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
|
|
||||||
// 这里得重新更新一下,因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态
|
|
||||||
wfInstance.setDag(JSON.toJSONString(dag));
|
|
||||||
// 如果没有就绪的节点,需要再次判断是否已经全部完成
|
|
||||||
if (readyNodes.isEmpty() && isFinish(dag)) {
|
|
||||||
allFinished = true;
|
|
||||||
}
|
|
||||||
// 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了)
|
|
||||||
if (allFinished) {
|
|
||||||
wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
|
|
||||||
// 最终任务的结果作为整个 workflow 的结果
|
|
||||||
wfInstance.setResult(result);
|
|
||||||
wfInstance.setFinishedTime(System.currentTimeMillis());
|
|
||||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
|
||||||
|
|
||||||
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (PEWorkflowDAG.Node readyNode : readyNodes) {
|
|
||||||
// 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示
|
|
||||||
// instanceParam 传递的是工作流实例的 wfContext
|
|
||||||
Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getNodeParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis());
|
|
||||||
readyNode.setInstanceId(newInstanceId);
|
|
||||||
readyNode.setStatus(InstanceStatus.RUNNING.getV());
|
|
||||||
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId);
|
|
||||||
}
|
|
||||||
// 这里也得更新 DAG 信息
|
|
||||||
wfInstance.setDag(JSON.toJSONString(dag));
|
|
||||||
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
|
||||||
// 持久化结束后,开始调度执行所有的任务
|
|
||||||
readyNodes.forEach(this::runInstance);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance);
|
|
||||||
log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (InterruptedException ignore) {
|
|
||||||
// ignore
|
|
||||||
} finally {
|
|
||||||
segmentLock.unlock(lockId);
|
|
||||||
}
|
}
|
||||||
|
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get();
|
||||||
|
Long wfId = wfInstance.getWorkflowId();
|
||||||
|
|
||||||
|
// 特殊处理手动终止 且 工作流实例已经不在运行状态的情况
|
||||||
|
if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||||
|
// 由用户手动停止工作流实例导致,不需要任何操作
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
||||||
|
// 更新完成节点状态
|
||||||
|
boolean allFinished = true;
|
||||||
|
PEWorkflowDAG.Node instanceNode = null;
|
||||||
|
for (PEWorkflowDAG.Node node : dag.getNodes()) {
|
||||||
|
if (instanceId.equals(node.getInstanceId())) {
|
||||||
|
node.setStatus(status.getV());
|
||||||
|
node.setResult(result);
|
||||||
|
instanceNode = node;
|
||||||
|
log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result);
|
||||||
|
}
|
||||||
|
if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) {
|
||||||
|
allFinished = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (instanceNode == null) {
|
||||||
|
// DAG 中的节点实例已经被覆盖(原地重试,生成了新的实例信息),直接忽略
|
||||||
|
log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", wfId, wfInstanceId, instanceId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
wfInstance.setGmtModified(new Date());
|
||||||
|
wfInstance.setDag(JSON.toJSONString(dag));
|
||||||
|
// 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图
|
||||||
|
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||||
|
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||||
|
log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 任务失败 && 不允许失败跳过,DAG流程被打断,整体失败
|
||||||
|
if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) {
|
||||||
|
log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId);
|
||||||
|
onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 子任务被手动停止
|
||||||
|
if (status == InstanceStatus.STOPPED) {
|
||||||
|
wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV());
|
||||||
|
wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_STOPPED);
|
||||||
|
wfInstance.setFinishedTime(System.currentTimeMillis());
|
||||||
|
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||||
|
|
||||||
|
log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 注意:这里会直接跳过 disable 的节点
|
||||||
|
List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
|
||||||
|
// 这里得重新更新一下,因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态
|
||||||
|
wfInstance.setDag(JSON.toJSONString(dag));
|
||||||
|
// 如果没有就绪的节点,需要再次判断是否已经全部完成
|
||||||
|
if (readyNodes.isEmpty() && isFinish(dag)) {
|
||||||
|
allFinished = true;
|
||||||
|
}
|
||||||
|
// 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了)
|
||||||
|
if (allFinished) {
|
||||||
|
wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
|
||||||
|
// 最终任务的结果作为整个 workflow 的结果
|
||||||
|
wfInstance.setResult(result);
|
||||||
|
wfInstance.setFinishedTime(System.currentTimeMillis());
|
||||||
|
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||||
|
|
||||||
|
log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (PEWorkflowDAG.Node readyNode : readyNodes) {
|
||||||
|
// 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示
|
||||||
|
// instanceParam 传递的是工作流实例的 wfContext
|
||||||
|
Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getNodeParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis());
|
||||||
|
readyNode.setInstanceId(newInstanceId);
|
||||||
|
readyNode.setStatus(InstanceStatus.RUNNING.getV());
|
||||||
|
log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId);
|
||||||
|
}
|
||||||
|
// 这里也得更新 DAG 信息
|
||||||
|
wfInstance.setDag(JSON.toJSONString(dag));
|
||||||
|
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
|
||||||
|
// 持久化结束后,开始调度执行所有的任务
|
||||||
|
readyNodes.forEach(this::runInstance);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance);
|
||||||
|
log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 更新工作流上下文
|
* 更新工作流上下文
|
||||||
|
* fix : 得和其他操作工作流实例的方法用同一把锁才行,不然有并发问题,会导致节点状态被覆盖
|
||||||
*
|
*
|
||||||
* @param wfInstanceId 工作流实例
|
* @param wfInstanceId 工作流实例
|
||||||
* @param appendedWfContextData 追加的上下文数据
|
* @param appendedWfContextData 追加的上下文数据
|
||||||
* @since 2021/02/05
|
* @since 2021/02/05
|
||||||
*/
|
*/
|
||||||
@UseSegmentLock(type = "updateWfContext", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
|
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
|
||||||
public void updateWorkflowContext(Long wfInstanceId, Map<String, String> appendedWfContextData) {
|
public void updateWorkflowContext(Long wfInstanceId, Map<String, String> appendedWfContextData) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -8,6 +8,7 @@ import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
|
|||||||
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
|
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
|
||||||
import com.github.kfcfans.powerjob.common.response.WorkflowInstanceInfoDTO;
|
import com.github.kfcfans.powerjob.common.response.WorkflowInstanceInfoDTO;
|
||||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||||
|
import tech.powerjob.server.core.lock.UseSegmentLock;
|
||||||
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
|
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
|
||||||
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
|
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
|
||||||
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
|
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
|
||||||
@ -53,7 +54,8 @@ public class WorkflowInstanceService {
|
|||||||
* @param wfInstanceId 工作流实例ID
|
* @param wfInstanceId 工作流实例ID
|
||||||
* @param appId 所属应用ID
|
* @param appId 所属应用ID
|
||||||
*/
|
*/
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
|
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
|
||||||
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
|
public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
|
||||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||||
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||||
@ -92,7 +94,8 @@ public class WorkflowInstanceService {
|
|||||||
* @param wfInstanceId 工作流实例ID
|
* @param wfInstanceId 工作流实例ID
|
||||||
* @param appId 应用ID
|
* @param appId 应用ID
|
||||||
*/
|
*/
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
|
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
|
||||||
public void retryWorkflowInstance(Long wfInstanceId, Long appId) {
|
public void retryWorkflowInstance(Long wfInstanceId, Long appId) {
|
||||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||||
// 仅允许重试 失败的工作流
|
// 仅允许重试 失败的工作流
|
||||||
@ -107,7 +110,7 @@ public class WorkflowInstanceService {
|
|||||||
throw new PowerJobException("you can't retry the workflow instance which is missing job info!");
|
throw new PowerJobException("you can't retry the workflow instance which is missing job info!");
|
||||||
}
|
}
|
||||||
// 校验 DAG 信息
|
// 校验 DAG 信息
|
||||||
PEWorkflowDAG dag = null;
|
PEWorkflowDAG dag;
|
||||||
try {
|
try {
|
||||||
dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class);
|
||||||
if (!WorkflowDAGUtils.valid(dag)) {
|
if (!WorkflowDAGUtils.valid(dag)) {
|
||||||
@ -161,13 +164,17 @@ public class WorkflowInstanceService {
|
|||||||
* 而且仅会操作工作流实例 DAG 中的节点信息(状态、result)
|
* 而且仅会操作工作流实例 DAG 中的节点信息(状态、result)
|
||||||
* 并不会改变对应任务实例中的任何信息
|
* 并不会改变对应任务实例中的任何信息
|
||||||
*
|
*
|
||||||
|
* 还是加把锁保平安 ~
|
||||||
|
*
|
||||||
* @param wfInstanceId 工作流实例 ID
|
* @param wfInstanceId 工作流实例 ID
|
||||||
* @param nodeId 节点 ID
|
* @param nodeId 节点 ID
|
||||||
*/
|
*/
|
||||||
|
@DesignateServer
|
||||||
|
@UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
|
||||||
public void markNodeAsSuccess(Long appId, Long wfInstanceId, Long nodeId) {
|
public void markNodeAsSuccess(Long appId, Long wfInstanceId, Long nodeId) {
|
||||||
|
|
||||||
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
|
||||||
// 校验工作流实例状态,运行中的不允许处理,
|
// 校验工作流实例状态,运行中的不允许处理
|
||||||
if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
|
||||||
throw new PowerJobException("you can't mark the node in a running workflow!");
|
throw new PowerJobException("you can't mark the node in a running workflow!");
|
||||||
}
|
}
|
||||||
|
@ -265,7 +265,7 @@ public class WorkflowService {
|
|||||||
* @param delay 延迟时间
|
* @param delay 延迟时间
|
||||||
* @return 该 workflow 实例的 instanceId(wfInstanceId)
|
* @return 该 workflow 实例的 instanceId(wfInstanceId)
|
||||||
*/
|
*/
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
public Long runWorkflow(Long wfId, Long appId, String initParams, Long delay) {
|
public Long runWorkflow(Long wfId, Long appId, String initParams, Long delay) {
|
||||||
|
|
||||||
delay = delay == null ? 0 : delay;
|
delay = delay == null ? 0 : delay;
|
||||||
|
@ -18,17 +18,39 @@ import java.util.Optional;
|
|||||||
*/
|
*/
|
||||||
public interface WorkflowInstanceInfoRepository extends JpaRepository<WorkflowInstanceInfoDO, Long> {
|
public interface WorkflowInstanceInfoRepository extends JpaRepository<WorkflowInstanceInfoDO, Long> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查找对应工作流实例
|
||||||
|
* @param wfInstanceId 实例 ID
|
||||||
|
* @return 工作流实例
|
||||||
|
*/
|
||||||
Optional<WorkflowInstanceInfoDO> findByWfInstanceId(Long wfInstanceId);
|
Optional<WorkflowInstanceInfoDO> findByWfInstanceId(Long wfInstanceId);
|
||||||
|
|
||||||
// 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧...
|
/**
|
||||||
// 结果只能用 int 接收
|
* 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧...
|
||||||
|
* 结果只能用 int 接收
|
||||||
|
* @param time 更新时间阈值
|
||||||
|
* @param status 状态列表
|
||||||
|
* @return 删除的记录条数
|
||||||
|
*/
|
||||||
@Modifying
|
@Modifying
|
||||||
@Transactional
|
@Transactional(rollbackOn = Exception.class)
|
||||||
@Query(value = "delete from WorkflowInstanceInfoDO where gmtModified < ?1 and status in ?2")
|
@Query(value = "delete from WorkflowInstanceInfoDO where gmtModified < ?1 and status in ?2")
|
||||||
int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List<Integer> status);
|
int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List<Integer> status);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 统计该工作流下处于对应状态的实例数量
|
||||||
|
* @param workflowId 工作流 ID
|
||||||
|
* @param status 状态列表
|
||||||
|
* @return 更新的记录条数
|
||||||
|
*/
|
||||||
int countByWorkflowIdAndStatusIn(Long workflowId, List<Integer> status);
|
int countByWorkflowIdAndStatusIn(Long workflowId, List<Integer> status);
|
||||||
|
|
||||||
// 状态检查
|
/**
|
||||||
|
* 加载期望调度时间小于给定阈值的
|
||||||
|
* @param appIds 应用 ID 列表
|
||||||
|
* @param status 状态
|
||||||
|
* @param time 期望调度时间阈值
|
||||||
|
* @return 工作流实例列表
|
||||||
|
*/
|
||||||
List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> appIds, int status, long time);
|
List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> appIds, int status, long time);
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,8 @@ import java.lang.annotation.Target;
|
|||||||
public @interface DesignateServer {
|
public @interface DesignateServer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 转发请求需要 AppInfo 下的 currentServer 信息,因此必须要有 appId 作为入参,该字段指定了 appId 字段的参数名称
|
* 转发请求需要 AppInfo 下的 currentServer 信息,因此必须要有 appId 作为入参,该字段指定了 appId 字段的参数名称,默认为 appId
|
||||||
* @return appId 参数名称
|
* @return appId 参数名称
|
||||||
*/
|
*/
|
||||||
String appIdParameterName();
|
String appIdParameterName() default "appId";
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
|
|||||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||||
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
|
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
|
||||||
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
|
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
|
||||||
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
|
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
|
||||||
@ -37,12 +38,13 @@ import java.util.concurrent.CompletionStage;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
@Aspect
|
@Aspect
|
||||||
@Component
|
@Component
|
||||||
|
@Order(0)
|
||||||
public class DesignateServerAspect {
|
public class DesignateServerAspect {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private AppInfoRepository appInfoRepository;
|
private AppInfoRepository appInfoRepository;
|
||||||
|
|
||||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
@Around(value = "@annotation(designateServer))")
|
@Around(value = "@annotation(designateServer))")
|
||||||
public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable {
|
public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable {
|
||||||
@ -99,7 +101,7 @@ public class DesignateServerAspect {
|
|||||||
Method method = methodSignature.getMethod();
|
Method method = methodSignature.getMethod();
|
||||||
JavaType returnType = getMethodReturnJavaType(method);
|
JavaType returnType = getMethodReturnJavaType(method);
|
||||||
|
|
||||||
return objectMapper.readValue(askResponse.getData(), returnType);
|
return OBJECT_MAPPER.readValue(askResponse.getData(), returnType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ public class WorkerClusterQueryService {
|
|||||||
return workers;
|
return workers;
|
||||||
}
|
}
|
||||||
|
|
||||||
@DesignateServer(appIdParameterName = "appId")
|
@DesignateServer
|
||||||
public List<WorkerInfo> getAllWorkers(Long appId) {
|
public List<WorkerInfo> getAllWorkers(Long appId) {
|
||||||
List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());
|
List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());
|
||||||
workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
|
workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user