fix: task status transfer anomaly, #404

This commit is contained in:
Echo009 2022-05-19 20:13:00 +08:00
parent a9a0422de1
commit 0c4eb3834a

View File

@ -2,6 +2,7 @@ package tech.powerjob.worker.core.tracker.task;
import akka.actor.ActorSelection;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.AllArgsConstructor;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.RemoteConstant;
@ -94,9 +95,10 @@ public abstract class TaskTracker {
*/
protected final Map<String, String> appendedWfContext;
/**
* 上报时间缓存
* 任务信息缓存
*/
private final Cache<String, Long> taskId2LastReportTime;
private final Cache<String, TaskBriefInfo> taskId2BriefInfo;
/**
* 分段锁
@ -128,7 +130,7 @@ public abstract class TaskTracker {
// 只有工作流中的任务允许向工作流中追加上下文数据
this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap();
// 构建缓存
taskId2LastReportTime = CacheBuilder.newBuilder().maximumSize(1024).build();
taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build();
// 构建分段锁
segmentLock = new SegmentLock(UPDATE_CONCURRENCY);
@ -191,7 +193,7 @@ public abstract class TaskTracker {
}
// 检查追加的上下文大小是否超出限制
if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!", instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
// ignore appended workflow context data
return;
}
@ -214,7 +216,7 @@ public abstract class TaskTracker {
* @param reportTime 上报时间
* @param result task的执行结果未执行完成时为空
*/
@SuppressWarnings({"squid:S3776","squid:S2142"})
@SuppressWarnings({"squid:S3776", "squid:S2142"})
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
if (finished.get()) {
@ -227,33 +229,39 @@ public abstract class TaskTracker {
// 阻塞获取锁
segmentLock.lockInterruptible(lockId);
Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId);
TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId);
// 缓存中不存在从数据库查
if (lastReportTime == null) {
if (taskBriefInfo == null) {
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
if (taskOpt.isPresent()) {
lastReportTime = taskOpt.get().getLastReportTime();
TaskDO taskDO = taskOpt.get();
taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());
} else {
// 理论上不存在这种情况除非数据库异常
log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);
}
// 写入缓存
taskId2BriefInfo.put(taskId, taskBriefInfo);
}
if (lastReportTime == null) {
lastReportTime = -1L;
}
}
// 过滤过期的请求潜在的集群时间一致性需求重试跨Worker时时间不一致可能导致问题
if (lastReportTime > reportTime) {
// 过滤过期的请求潜在的集群时间一致性需求重试跨 Worker 时间不一致可能导致问题
if (taskBriefInfo.getLastReportTime() > reportTime) {
log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, lastReportTime, reportTime, taskId, newStatus);
instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus);
return;
}
// 检查状态转移是否合法fix issue 404
if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {
log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus);
return;
}
// 此时本次请求已经有效先写入最新的时间
taskId2LastReportTime.put(taskId, reportTime);
// 此时本次请求已经有效先更新相关信息
taskBriefInfo.setLastReportTime(reportTime);
taskBriefInfo.setStatus(nTaskStatus);
// 处理失败的情况
int configTaskRetryNum = instanceInfo.getTaskRetryNum();
@ -458,7 +466,7 @@ public abstract class TaskTracker {
// 2. 更新 ProcessorTrackerStatus 状态
ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
// 3. 初始化缓存
taskId2LastReportTime.put(task.getTaskId(), -1L);
taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));
// 4. 任务派发
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());
@ -578,6 +586,17 @@ public abstract class TaskTracker {
}
}
@Data
@AllArgsConstructor
protected static class TaskBriefInfo {
private String id;
private TaskStatus status;
private Long lastReportTime;
}
/**
* 存储任务实例产生的各个Task状态用于分析任务实例执行情况
*/