From afff77b540d0f612263664c9e44ecc5a0d481e6b Mon Sep 17 00:00:00 2001 From: Echo009 Date: Mon, 8 Mar 2021 16:14:08 +0800 Subject: [PATCH] fix: concurrency problem when process workflow instance --- .../core/instance/InstanceLogService.java | 32 ++- .../server/core/instance/InstanceService.java | 5 +- .../core/lock/UseSegmentLockAspect.java | 2 + .../server/core/service/JobService.java | 2 +- .../workflow/WorkflowInstanceManager.java | 221 +++++++++--------- .../workflow/WorkflowInstanceService.java | 15 +- .../server/core/workflow/WorkflowService.java | 2 +- .../WorkflowInstanceInfoRepository.java | 30 ++- .../server/redirector/DesignateServer.java | 4 +- .../redirector/DesignateServerAspect.java | 6 +- .../worker/WorkerClusterQueryService.java | 2 +- 11 files changed, 179 insertions(+), 142 deletions(-) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java index 7efded31..172c9336 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java @@ -55,24 +55,36 @@ public class InstanceLogService { private InstanceMetadataService instanceMetadataService; @Resource private GridFsManager gridFsManager; - // 本地数据库操作bean + /** + * 本地数据库操作bean + */ @Resource(name = "localTransactionTemplate") private TransactionTemplate localTransactionTemplate; @Resource private LocalInstanceLogRepository localInstanceLogRepository; - // 本地维护了在线日志的任务实例ID + /** + * 本地维护了在线日志的任务实例ID + */ private final Map instanceId2LastReportTime = Maps.newConcurrentMap(); private final ExecutorService workerPool; - // 分段锁 + /** + * 分段锁 + */ private final SegmentLock segmentLock = new SegmentLock(8); - // 格式化时间戳 - private static final FastDateFormat dateFormat = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS); - // 每一个展示的行数 + /** + * 格式化时间戳 + */ + private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS); + /** + * 每一个展示的行数 + */ private static final int MAX_LINE_COUNT = 100; - // 过期时间 + /** + * 过期时间 + */ private static final long EXPIRE_INTERVAL_MS = 60000; public InstanceLogService() { @@ -110,7 +122,7 @@ public class InstanceLogService { * @param index 页码,从0开始 * @return 文本字符串 */ - @DesignateServer(appIdParameterName = "appId") + @DesignateServer public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) { try { Future fileFuture = prepareLogFile(instanceId); @@ -154,7 +166,7 @@ public class InstanceLogService { * @param instanceId 任务实例 ID * @return 下载链接 */ - @DesignateServer(appIdParameterName = "appId") + @DesignateServer public String fetchDownloadUrl(Long appId, Long instanceId) { String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId; log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url); @@ -326,7 +338,7 @@ public class InstanceLogService { */ private static String convertLog(LocalInstanceLogDO instanceLog) { return String.format("%s [%s] %s %s", - dateFormat.format(instanceLog.getLogTime()), + DATE_FORMAT.format(instanceLog.getLogTime()), instanceLog.getWorkerAddress(), LogLevel.genLogLevelString(instanceLog.getLogLevel()), instanceLog.getLogContent()); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java index 7f0c9d7c..cf62454b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java @@ -104,7 +104,7 @@ public class InstanceService { * * @param instanceId 任务实例ID */ - @DesignateServer(appIdParameterName = "appId") + @DesignateServer public void stopInstance(Long appId,Long instanceId) { log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId); @@ -152,7 +152,7 @@ public class InstanceService { * * @param instanceId 任务实例ID */ - @DesignateServer(appIdParameterName = "appId") + @DesignateServer public void retryInstance(Long appId, Long instanceId) { log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId); @@ -186,6 +186,7 @@ public class InstanceService { * * @param instanceId 任务实例 */ + @DesignateServer public void cancelInstance(Long instanceId) { log.info("[Instance-{}] try to cancel the instance.", instanceId); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseSegmentLockAspect.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseSegmentLockAspect.java index b74bff1e..9500f5b6 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseSegmentLockAspect.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseSegmentLockAspect.java @@ -1,6 +1,7 @@ package tech.powerjob.server.core.lock; import com.github.kfcfans.powerjob.common.utils.SegmentLock; +import org.springframework.core.annotation.Order; import tech.powerjob.server.common.utils.AOPUtils; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; @@ -20,6 +21,7 @@ import java.util.Map; @Slf4j @Aspect @Component +@Order(1) public class UseSegmentLockAspect { private final Map lockStore = Maps.newConcurrentMap(); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java index 9cc47f0b..d20bb862 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java @@ -145,7 +145,7 @@ public class JobService { * @param delay 延迟时间,单位 毫秒 * @return 任务实例ID */ - @DesignateServer(appIdParameterName = "appId") + @DesignateServer public long runJob(Long appId, Long jobId, String instanceParams, Long delay) { delay = delay == null ? 0 : delay; diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java index dffc5ac4..b7c7847c 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java @@ -5,7 +5,6 @@ import com.alibaba.fastjson.TypeReference; import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.utils.JsonUtils; -import com.github.kfcfans.powerjob.common.utils.SegmentLock; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.persistence.remote.model.*; @@ -63,8 +62,6 @@ public class WorkflowInstanceManager { @Resource private WorkflowNodeInfoRepository workflowNodeInfoRepository; - private final SegmentLock segmentLock = new SegmentLock(16); - /** * 创建工作流任务实例 * ******************************************** @@ -101,7 +98,7 @@ public class WorkflowInstanceManager { newWfInstance.setGmtModified(now); // 校验 DAG 信息 - PEWorkflowDAG dag = null; + PEWorkflowDAG dag; try { dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); // 校验 @@ -151,6 +148,7 @@ public class WorkflowInstanceManager { * @param wfInfo 工作流任务信息 * @param wfInstanceId 工作流任务实例ID */ + @UseSegmentLock(type = "startWfInstance", key = "#wfInfo.getId().intValue()", concurrencyLevel = 1024) public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) { Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); @@ -169,7 +167,7 @@ public class WorkflowInstanceManager { if (wfInfo.getMaxWfInstanceNum() > 0) { // 并发度控制 int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS); - if ( instanceConcurrency > wfInfo.getMaxWfInstanceNum()) { + if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) { onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo); return; } @@ -230,128 +228,121 @@ public class WorkflowInstanceManager { * @param result 完成任务的任务实例结果 */ @SuppressWarnings({"squid:S3776", "squid:S2142", "squid:S1141"}) + @UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) { - int lockId = wfInstanceId.hashCode(); - try { - segmentLock.lockInterruptible(lockId); - - Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); - if (!wfInstanceInfoOpt.isPresent()) { - log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId); - return; - } - WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get(); - Long wfId = wfInstance.getWorkflowId(); - - // 特殊处理手动终止 且 工作流实例已经不在运行状态的情况 - if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { - // 由用户手动停止工作流实例导致,不需要任何操作 - return; - } - - try { - PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); - // 更新完成节点状态 - boolean allFinished = true; - PEWorkflowDAG.Node instanceNode = null; - for (PEWorkflowDAG.Node node : dag.getNodes()) { - if (instanceId.equals(node.getInstanceId())) { - node.setStatus(status.getV()); - node.setResult(result); - instanceNode = node; - log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result); - } - if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { - allFinished = false; - } - } - if (instanceNode == null) { - // DAG 中的节点实例已经被覆盖(原地重试,生成了新的实例信息),直接忽略 - log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", wfId, wfInstanceId, instanceId); - return; - } - - wfInstance.setGmtModified(new Date()); - wfInstance.setDag(JSON.toJSONString(dag)); - // 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图 - if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { - workflowInstanceInfoRepository.saveAndFlush(wfInstance); - log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus()); - return; - } - - // 任务失败 && 不允许失败跳过,DAG流程被打断,整体失败 - if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) { - log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); - onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance); - return; - } - - // 子任务被手动停止 - if (status == InstanceStatus.STOPPED) { - wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); - wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_STOPPED); - wfInstance.setFinishedTime(System.currentTimeMillis()); - workflowInstanceInfoRepository.saveAndFlush(wfInstance); - - log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId); - return; - } - // 注意:这里会直接跳过 disable 的节点 - List readyNodes = WorkflowDAGUtils.listReadyNodes(dag); - // 这里得重新更新一下,因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态 - wfInstance.setDag(JSON.toJSONString(dag)); - // 如果没有就绪的节点,需要再次判断是否已经全部完成 - if (readyNodes.isEmpty() && isFinish(dag)) { - allFinished = true; - } - // 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了) - if (allFinished) { - wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); - // 最终任务的结果作为整个 workflow 的结果 - wfInstance.setResult(result); - wfInstance.setFinishedTime(System.currentTimeMillis()); - workflowInstanceInfoRepository.saveAndFlush(wfInstance); - - log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); - return; - } - - for (PEWorkflowDAG.Node readyNode : readyNodes) { - // 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示 - // instanceParam 传递的是工作流实例的 wfContext - Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getNodeParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); - readyNode.setInstanceId(newInstanceId); - readyNode.setStatus(InstanceStatus.RUNNING.getV()); - log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId); - } - // 这里也得更新 DAG 信息 - wfInstance.setDag(JSON.toJSONString(dag)); - workflowInstanceInfoRepository.saveAndFlush(wfInstance); - // 持久化结束后,开始调度执行所有的任务 - readyNodes.forEach(this::runInstance); - - } catch (Exception e) { - onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance); - log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e); - } - - } catch (InterruptedException ignore) { - // ignore - } finally { - segmentLock.unlock(lockId); + Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); + if (!wfInstanceInfoOpt.isPresent()) { + log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId); + return; } + WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get(); + Long wfId = wfInstance.getWorkflowId(); + + // 特殊处理手动终止 且 工作流实例已经不在运行状态的情况 + if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { + // 由用户手动停止工作流实例导致,不需要任何操作 + return; + } + + try { + PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); + // 更新完成节点状态 + boolean allFinished = true; + PEWorkflowDAG.Node instanceNode = null; + for (PEWorkflowDAG.Node node : dag.getNodes()) { + if (instanceId.equals(node.getInstanceId())) { + node.setStatus(status.getV()); + node.setResult(result); + instanceNode = node; + log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result); + } + if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { + allFinished = false; + } + } + if (instanceNode == null) { + // DAG 中的节点实例已经被覆盖(原地重试,生成了新的实例信息),直接忽略 + log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", wfId, wfInstanceId, instanceId); + return; + } + + wfInstance.setGmtModified(new Date()); + wfInstance.setDag(JSON.toJSONString(dag)); + // 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的DAG图 + if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus()); + return; + } + + // 任务失败 && 不允许失败跳过,DAG流程被打断,整体失败 + if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) { + log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); + onWorkflowInstanceFailed(SystemInstanceResult.MIDDLE_JOB_FAILED, wfInstance); + return; + } + + // 子任务被手动停止 + if (status == InstanceStatus.STOPPED) { + wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); + wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_STOPPED); + wfInstance.setFinishedTime(System.currentTimeMillis()); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + + log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId); + return; + } + // 注意:这里会直接跳过 disable 的节点 + List readyNodes = WorkflowDAGUtils.listReadyNodes(dag); + // 这里得重新更新一下,因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态 + wfInstance.setDag(JSON.toJSONString(dag)); + // 如果没有就绪的节点,需要再次判断是否已经全部完成 + if (readyNodes.isEmpty() && isFinish(dag)) { + allFinished = true; + } + // 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了) + if (allFinished) { + wfInstance.setStatus(WorkflowInstanceStatus.SUCCEED.getV()); + // 最终任务的结果作为整个 workflow 的结果 + wfInstance.setResult(result); + wfInstance.setFinishedTime(System.currentTimeMillis()); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + + log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); + return; + } + + for (PEWorkflowDAG.Node readyNode : readyNodes) { + // 同理:这里必须保证任务实例全部创建成功,避免部分失败导致已经生成的实例节点在工作流日志中没法展示 + // instanceParam 传递的是工作流实例的 wfContext + Long newInstanceId = instanceService.create(readyNode.getJobId(), wfInstance.getAppId(), readyNode.getNodeParams(), wfInstance.getWfContext(), wfInstanceId, System.currentTimeMillis()); + readyNode.setInstanceId(newInstanceId); + readyNode.setStatus(InstanceStatus.RUNNING.getV()); + log.debug("[Workflow-{}|{}] workflowInstance start to process new node(nodeId={},jobId={},instanceId={})", wfId, wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), newInstanceId); + } + // 这里也得更新 DAG 信息 + wfInstance.setDag(JSON.toJSONString(dag)); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + // 持久化结束后,开始调度执行所有的任务 + readyNodes.forEach(this::runInstance); + + } catch (Exception e) { + onWorkflowInstanceFailed("MOVE NEXT STEP FAILED: " + e.getMessage(), wfInstance); + log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e); + } + } /** * 更新工作流上下文 + * fix : 得和其他操作工作流实例的方法用同一把锁才行,不然有并发问题,会导致节点状态被覆盖 * * @param wfInstanceId 工作流实例 * @param appendedWfContextData 追加的上下文数据 * @since 2021/02/05 */ - @UseSegmentLock(type = "updateWfContext", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) + @UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) public void updateWorkflowContext(Long wfInstanceId, Map appendedWfContextData) { try { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java index 921e937e..c0c7ef86 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java @@ -8,6 +8,7 @@ import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.response.WorkflowInstanceInfoDTO; import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.core.lock.UseSegmentLock; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; @@ -53,7 +54,8 @@ public class WorkflowInstanceService { * @param wfInstanceId 工作流实例ID * @param appId 所属应用ID */ - @DesignateServer(appIdParameterName = "appId") + @DesignateServer + @UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) public void stopWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { @@ -92,7 +94,8 @@ public class WorkflowInstanceService { * @param wfInstanceId 工作流实例ID * @param appId 应用ID */ - @DesignateServer(appIdParameterName = "appId") + @DesignateServer + @UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) public void retryWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); // 仅允许重试 失败的工作流 @@ -107,7 +110,7 @@ public class WorkflowInstanceService { throw new PowerJobException("you can't retry the workflow instance which is missing job info!"); } // 校验 DAG 信息 - PEWorkflowDAG dag = null; + PEWorkflowDAG dag; try { dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); if (!WorkflowDAGUtils.valid(dag)) { @@ -161,13 +164,17 @@ public class WorkflowInstanceService { * 而且仅会操作工作流实例 DAG 中的节点信息(状态、result) * 并不会改变对应任务实例中的任何信息 * + * 还是加把锁保平安 ~ + * * @param wfInstanceId 工作流实例 ID * @param nodeId 节点 ID */ + @DesignateServer + @UseSegmentLock(type = "processWfInstance", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) public void markNodeAsSuccess(Long appId, Long wfInstanceId, Long nodeId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); - // 校验工作流实例状态,运行中的不允许处理, + // 校验工作流实例状态,运行中的不允许处理 if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { throw new PowerJobException("you can't mark the node in a running workflow!"); } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java index 24695e86..b6c0c966 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowService.java @@ -265,7 +265,7 @@ public class WorkflowService { * @param delay 延迟时间 * @return 该 workflow 实例的 instanceId(wfInstanceId) */ - @DesignateServer(appIdParameterName = "appId") + @DesignateServer public Long runWorkflow(Long wfId, Long appId, String initParams, Long delay) { delay = delay == null ? 0 : delay; diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/repository/WorkflowInstanceInfoRepository.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/repository/WorkflowInstanceInfoRepository.java index 3782d864..74a11066 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/repository/WorkflowInstanceInfoRepository.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/repository/WorkflowInstanceInfoRepository.java @@ -18,17 +18,39 @@ import java.util.Optional; */ public interface WorkflowInstanceInfoRepository extends JpaRepository { + /** + * 查找对应工作流实例 + * @param wfInstanceId 实例 ID + * @return 工作流实例 + */ Optional findByWfInstanceId(Long wfInstanceId); - // 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧... - // 结果只能用 int 接收 + /** + * 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧... + * 结果只能用 int 接收 + * @param time 更新时间阈值 + * @param status 状态列表 + * @return 删除的记录条数 + */ @Modifying - @Transactional + @Transactional(rollbackOn = Exception.class) @Query(value = "delete from WorkflowInstanceInfoDO where gmtModified < ?1 and status in ?2") int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List status); + /** + * 统计该工作流下处于对应状态的实例数量 + * @param workflowId 工作流 ID + * @param status 状态列表 + * @return 更新的记录条数 + */ int countByWorkflowIdAndStatusIn(Long workflowId, List status); - // 状态检查 + /** + * 加载期望调度时间小于给定阈值的 + * @param appIds 应用 ID 列表 + * @param status 状态 + * @param time 期望调度时间阈值 + * @return 工作流实例列表 + */ List findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List appIds, int status, long time); } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServer.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServer.java index 0da74566..4e64ab20 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServer.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServer.java @@ -17,8 +17,8 @@ import java.lang.annotation.Target; public @interface DesignateServer { /** - * 转发请求需要 AppInfo 下的 currentServer 信息,因此必须要有 appId 作为入参,该字段指定了 appId 字段的参数名称 + * 转发请求需要 AppInfo 下的 currentServer 信息,因此必须要有 appId 作为入参,该字段指定了 appId 字段的参数名称,默认为 appId * @return appId 参数名称 */ - String appIdParameterName(); + String appIdParameterName() default "appId"; } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java index 994ceebb..fc4e3002 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.AskResponse; +import org.springframework.core.annotation.Order; import tech.powerjob.server.persistence.remote.model.AppInfoDO; import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; import tech.powerjob.server.remote.transport.starter.AkkaStarter; @@ -37,12 +38,13 @@ import java.util.concurrent.CompletionStage; @Slf4j @Aspect @Component +@Order(0) public class DesignateServerAspect { @Resource private AppInfoRepository appInfoRepository; - private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Around(value = "@annotation(designateServer))") public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable { @@ -99,7 +101,7 @@ public class DesignateServerAspect { Method method = methodSignature.getMethod(); JavaType returnType = getMethodReturnJavaType(method); - return objectMapper.readValue(askResponse.getData(), returnType); + return OBJECT_MAPPER.readValue(askResponse.getData(), returnType); } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java index 59d0b95b..54a99fd2 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java @@ -53,7 +53,7 @@ public class WorkerClusterQueryService { return workers; } - @DesignateServer(appIdParameterName = "appId") + @DesignateServer public List getAllWorkers(Long appId) { List workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values()); workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());