fix: never update lastActiveTime which lead to TIMEOUT for frequentJob(thanks @Y)

This commit is contained in:
tjq 2020-10-08 16:54:47 +08:00
parent a9e4ed6262
commit a67adf9601
4 changed files with 20 additions and 5 deletions

View File

@ -36,6 +36,7 @@ public class BroadcastProcessorDemo extends BroadcastProcessor {
public ProcessResult process(TaskContext taskContext) throws Exception {
System.out.println("===== BroadcastProcessorDemo#process ======");
taskContext.getOmsLogger().info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost());
Thread.sleep(45 * 1000);
return new ProcessResult(true);
}

View File

@ -65,7 +65,7 @@ public class TaskTrackerActor extends AbstractActor {
taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
}
taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
}
/**

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -104,6 +105,14 @@ public class FrequentTaskTracker extends TaskTracker {
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS);
}
@Override
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
super.updateTaskStatus(subInstanceId, taskId, newStatus, reportTime, result);
// 更新 LastActiveTime
SubInstanceTimeHolder timeHolder = subInstanceId2TimeHolder.get(subInstanceId);
timeHolder.lastActiveTime = Math.max(reportTime, timeHolder.lastActiveTime);
}
@Override
public InstanceDetail fetchRunningStatus() {
InstanceDetail detail = new InstanceDetail();
@ -243,9 +252,13 @@ public class FrequentTaskTracker extends TaskTracker {
long heartbeatTimeout = nowTS - timeHolder.lastActiveTime;
// 超时包含总运行时间超时和心跳包超时直接判定为失败
if (executeTimeout > instanceTimeoutMS || heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) {
if (executeTimeout > instanceTimeoutMS) {
onFinished(subInstanceId, false, "RUNNING_TIMEOUT", iterator);
continue;
}
onFinished(subInstanceId, false, "TIMEOUT", iterator);
if (heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) {
onFinished(subInstanceId, false, "HEARTBEAT_TIMEOUT", iterator);
continue;
}

View File

@ -138,12 +138,13 @@ public abstract class TaskTracker {
/**
* 更新Task状态
* V1.0.0 -> V1.0.1e405e283ad7f97b0b4e5d369c7de884c0caf9192 锁方案变更 synchronized (taskId.intern()) 修改为分段锁能大大减少内存占用损失的只有理论并发度而已
* @param subInstanceId 子任务实例ID
* @param taskId task的IDtask为任务实例的执行单位
* @param newStatus task的新状态
* @param reportTime 上报时间
* @param result task的执行结果未执行完成时为空
*/
public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) {
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
if (finished.get()) {
return;
@ -278,7 +279,7 @@ public abstract class TaskTracker {
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
if (!CollectionUtils.isEmpty(unfinishedTask)) {
log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
}
}
}