feat: optimize the code of LightTaskTracker

This commit is contained in:
Echo009 2023-01-15 21:04:32 +08:00
parent 2c51e0601d
commit 1b3134291c
2 changed files with 33 additions and 16 deletions

View File

@ -34,6 +34,15 @@ public class SystemInstanceResult {
* 任务执行超时强制终止任务
*/
public static final String INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP= "instance execute timeout,force stop success";
/**
* 用户手动停止任务成功打断任务
*/
public static final String USER_STOP_INSTANCE_INTERRUPTED= "user stop instance,interrupted success";
/**
* 用户手动停止任务被系统强制终止
*/
public static final String USER_STOP_INSTANCE_FORCE_STOP= "user stop instance,force stop success";
/**
* 创建根任务失败
*/

View File

@ -72,7 +72,7 @@ public class LightTaskTracker extends TaskTracker {
*/
private ProcessResult result;
private boolean timeoutFlag = false;
private final AtomicBoolean timeoutFlag = new AtomicBoolean(false);
protected final AtomicBoolean stopFlag = new AtomicBoolean(false);
@ -95,8 +95,12 @@ public class LightTaskTracker extends TaskTracker {
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
// 超时控制
if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
// 超时控制的最小颗粒度为 1 s
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000, TimeUnit.MILLISECONDS);
if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
} else {
// 执行时间超过 1 s 的任务超时检测最小颗粒度为 1 s
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
}
} else {
timeoutCheckScheduledFuture = null;
}
@ -143,7 +147,7 @@ public class LightTaskTracker extends TaskTracker {
}
LightTaskTrackerManager.removeTaskTracker(instanceId);
// 最后一列为总耗时即占用资源的耗时当前时间减去创建时间
log.warn("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime - taskStartTime, System.currentTimeMillis() - createTime);
log.warn("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime);
}
@Override
@ -207,7 +211,13 @@ public class LightTaskTracker extends TaskTracker {
} catch (InterruptedException e) {
log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e);
Thread.currentThread().interrupt();
res = new ProcessResult(false, e.toString());
if (timeoutFlag.get()) {
res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED);
} else if (stopFlag.get()) {
res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED);
} else {
res = new ProcessResult(false, e.toString());
}
} catch (Exception e) {
log.warn("[TaskTracker-{}] process failed !", instanceId, e);
res = new ProcessResult(false, e.toString());
@ -216,14 +226,10 @@ public class LightTaskTracker extends TaskTracker {
log.warn("[TaskTracker-{}] processor return null !", instanceId);
res = new ProcessResult(false, "Processor return null");
}
} while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag);
} while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get());
executeThread.set(null);
taskEndTime = System.currentTimeMillis();
finished.set(true);
// 成功的情况下允许覆盖超时控制赋予的结果值
if (timeoutFlag && !res.isSuccess()) {
return res;
}
result = res;
status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
// 取消超时检查任务
@ -258,12 +264,19 @@ public class LightTaskTracker extends TaskTracker {
reportInstanceStatusReq.setFailedTaskNum(0);
if (stopFlag.get()) {
if (finished.get()) {
// 已经被成功打断
destroy();
return;
}
final Thread workerThread = executeThread.get();
if (!finished.get() && workerThread != null) {
// 未能成功打断任务强制停止
try {
if (tryForceStopThread(workerThread)) {
finished.set(true);
taskEndTime = System.currentTimeMillis();
result = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_FORCE_STOP);
log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName());
// 被终止的任务不需要上报状态
destroy();
@ -308,20 +321,15 @@ public class LightTaskTracker extends TaskTracker {
return;
}
// 首次判断超时
if (!timeoutFlag) {
if (timeoutFlag.compareAndSet(false, true)) {
// 超时仅尝试打断任务
log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},currentTime:{},runningTimeLimit:{}, try to interrupt it.", instanceId, taskStartTime, System.currentTimeMillis(), instanceInfo.getInstanceTimeoutMS());
processFuture.cancel(true);
timeoutFlag = true;
return;
}
if (finished.get()) {
// 已经成功被打断
log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},endTime:{}, interrupt success.", instanceId, taskStartTime, taskEndTime);
if (result == null) {
taskEndTime = System.currentTimeMillis();
result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED);
}
return;
}
Thread workerThread = executeThread.get();