fix: the bug of idle check #146

This commit is contained in:
tjq 2021-01-02 11:31:48 +08:00
parent 269d64065c
commit 3ecefd22cb
3 changed files with 8 additions and 4 deletions

View File

@ -314,7 +314,7 @@ public class WorkflowInstanceManager {
}
/**
* 允许任务实例
* 运行任务实例
* 需要将创建和运行任务实例分离否则在秒失败情况下会发生DAG覆盖更新的问题
* @param jobId 任务ID
* @param instanceId 任务实例ID

View File

@ -55,8 +55,10 @@ public class ProcessorTracker {
private OmsLogger omsLogger;
// ProcessResult 上报失败的重试队列
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
// 上一次空闲时间
// 上一次空闲时间用于闲置判定
private long lastIdleTime;
// 上次完成任务数量用于闲置判定
private long lastCompletedTaskCount;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
@ -88,6 +90,7 @@ public class ProcessorTracker {
this.omsLogger = new OmsServerLogger(instanceId);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
this.lastIdleTime = -1L;
this.lastCompletedTaskCount = 0L;
// 初始化 线程池TimingPool 启动的任务会检查 ThreadPool所以必须先初始化线程池否则NPE
initThreadPool();
@ -239,8 +242,9 @@ public class ProcessorTracker {
}
// 判断线程池活跃状态长时间空闲则上报 TaskTracker 请求检查
if (threadPool.getActiveCount() > 0) {
if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) {
lastIdleTime = -1;
lastCompletedTaskCount = threadPool.getCompletedTaskCount();
}else {
if (lastIdleTime == -1) {
lastIdleTime = System.currentTimeMillis();

View File

@ -446,7 +446,7 @@ public abstract class TaskTracker {
// 3. 避免大查询分批派发任务
long currentDispatchNum = 0;
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2;
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
AtomicInteger index = new AtomicInteger(0);
// 4. 循环查询数据库获取需要派发的任务