From 0c4eb3834a292432ad6a4729c1a6a9c340bb9c7f Mon Sep 17 00:00:00 2001 From: Echo009 Date: Thu, 19 May 2022 20:13:00 +0800 Subject: [PATCH] fix: task status transfer anomaly, #404 --- .../worker/core/tracker/task/TaskTracker.java | 57 ++++++++++++------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index 20182311..fdf35fa9 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -2,6 +2,7 @@ package tech.powerjob.worker.core.tracker.task; import akka.actor.ActorSelection; import com.fasterxml.jackson.core.type.TypeReference; +import lombok.AllArgsConstructor; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.RemoteConstant; @@ -94,9 +95,10 @@ public abstract class TaskTracker { */ protected final Map appendedWfContext; /** - * 上报时间缓存 + * 任务信息缓存 */ - private final Cache taskId2LastReportTime; + private final Cache taskId2BriefInfo; + /** * 分段锁 @@ -128,7 +130,7 @@ public abstract class TaskTracker { // 只有工作流中的任务允许向工作流中追加上下文数据 this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap(); // 构建缓存 - taskId2LastReportTime = CacheBuilder.newBuilder().maximumSize(1024).build(); + taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build(); // 构建分段锁 segmentLock = new SegmentLock(UPDATE_CONCURRENCY); @@ -191,7 +193,7 @@ public abstract class TaskTracker { } // 检查追加的上下文大小是否超出限制 if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) { - log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength()); + log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!", instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength()); // ignore appended workflow context data return; } @@ -214,7 +216,7 @@ public abstract class TaskTracker { * @param reportTime 上报时间 * @param result task的执行结果,未执行完成时为空 */ - @SuppressWarnings({"squid:S3776","squid:S2142"}) + @SuppressWarnings({"squid:S3776", "squid:S2142"}) public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { if (finished.get()) { @@ -227,33 +229,39 @@ public abstract class TaskTracker { // 阻塞获取锁 segmentLock.lockInterruptible(lockId); - - Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId); + TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId); // 缓存中不存在,从数据库查 - if (lastReportTime == null) { + if (taskBriefInfo == null) { Optional taskOpt = taskPersistenceService.getTask(instanceId, taskId); if (taskOpt.isPresent()) { - lastReportTime = taskOpt.get().getLastReportTime(); + TaskDO taskDO = taskOpt.get(); + taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime()); } else { // 理论上不存在这种情况,除非数据库异常 log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId); + taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L); } - - if (lastReportTime == null) { - lastReportTime = -1L; - } + // 写入缓存 + taskId2BriefInfo.put(taskId, taskBriefInfo); } - // 过滤过期的请求(潜在的集群时间一致性需求,重试跨Worker时,时间不一致可能导致问题) - if (lastReportTime > reportTime) { + // 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题) + if (taskBriefInfo.getLastReportTime() > reportTime) { log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", - instanceId, subInstanceId, lastReportTime, reportTime, taskId, newStatus); + instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus); + return; + } + // 检查状态转移是否合法,fix issue 404 + if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) { + log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.", + instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus); return; } - // 此时本次请求已经有效,先写入最新的时间 - taskId2LastReportTime.put(taskId, reportTime); + // 此时本次请求已经有效,先更新相关信息 + taskBriefInfo.setLastReportTime(reportTime); + taskBriefInfo.setStatus(nTaskStatus); // 处理失败的情况 int configTaskRetryNum = instanceInfo.getTaskRetryNum(); @@ -458,7 +466,7 @@ public abstract class TaskTracker { // 2. 更新 ProcessorTrackerStatus 状态 ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true); // 3. 初始化缓存 - taskId2LastReportTime.put(task.getTaskId(), -1L); + taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L)); // 4. 任务派发 TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress()); @@ -578,6 +586,17 @@ public abstract class TaskTracker { } } + @Data + @AllArgsConstructor + protected static class TaskBriefInfo { + + private String id; + + private TaskStatus status; + + private Long lastReportTime; + } + /** * 存储任务实例产生的各个Task状态,用于分析任务实例执行情况 */