From 1b3134291c125345d72cc56caeafcd710ba033c1 Mon Sep 17 00:00:00 2001 From: Echo009 Date: Sun, 15 Jan 2023 21:04:32 +0800 Subject: [PATCH] feat: optimize the code of LightTaskTracker --- .../powerjob/common/SystemInstanceResult.java | 9 +++++ .../tracker/task/light/LightTaskTracker.java | 40 +++++++++++-------- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/powerjob-common/src/main/java/tech/powerjob/common/SystemInstanceResult.java b/powerjob-common/src/main/java/tech/powerjob/common/SystemInstanceResult.java index 55f59ad4..9c168664 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/SystemInstanceResult.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/SystemInstanceResult.java @@ -34,6 +34,15 @@ public class SystemInstanceResult { * 任务执行超时,强制终止任务 */ public static final String INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP= "instance execute timeout,force stop success"; + + /** + * 用户手动停止任务,成功打断任务 + */ + public static final String USER_STOP_INSTANCE_INTERRUPTED= "user stop instance,interrupted success"; + /** + * 用户手动停止任务,被系统强制终止 + */ + public static final String USER_STOP_INSTANCE_FORCE_STOP= "user stop instance,force stop success"; /** * 创建根任务失败 */ diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index 011c8ee5..d46fc5dd 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -72,7 +72,7 @@ public class LightTaskTracker extends TaskTracker { */ private ProcessResult result; - private boolean timeoutFlag = false; + private final AtomicBoolean timeoutFlag = new AtomicBoolean(false); protected final AtomicBoolean stopFlag = new AtomicBoolean(false); @@ -95,8 +95,12 @@ public class LightTaskTracker extends TaskTracker { statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS); // 超时控制 if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) { - // 超时控制的最小颗粒度为 1 s - timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000, TimeUnit.MILLISECONDS); + if (instanceInfo.getInstanceTimeoutMS() < 1000L) { + timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); + } else { + // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s + timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); + } } else { timeoutCheckScheduledFuture = null; } @@ -143,7 +147,7 @@ public class LightTaskTracker extends TaskTracker { } LightTaskTrackerManager.removeTaskTracker(instanceId); // 最后一列为总耗时(即占用资源的耗时,当前时间减去创建时间) - log.warn("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime - taskStartTime, System.currentTimeMillis() - createTime); + log.warn("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime); } @Override @@ -207,7 +211,13 @@ public class LightTaskTracker extends TaskTracker { } catch (InterruptedException e) { log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e); Thread.currentThread().interrupt(); - res = new ProcessResult(false, e.toString()); + if (timeoutFlag.get()) { + res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED); + } else if (stopFlag.get()) { + res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED); + } else { + res = new ProcessResult(false, e.toString()); + } } catch (Exception e) { log.warn("[TaskTracker-{}] process failed !", instanceId, e); res = new ProcessResult(false, e.toString()); @@ -216,14 +226,10 @@ public class LightTaskTracker extends TaskTracker { log.warn("[TaskTracker-{}] processor return null !", instanceId); res = new ProcessResult(false, "Processor return null"); } - } while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag); + } while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get()); executeThread.set(null); taskEndTime = System.currentTimeMillis(); finished.set(true); - // 成功的情况下允许覆盖超时控制赋予的结果值 - if (timeoutFlag && !res.isSuccess()) { - return res; - } result = res; status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; // 取消超时检查任务 @@ -258,12 +264,19 @@ public class LightTaskTracker extends TaskTracker { reportInstanceStatusReq.setFailedTaskNum(0); if (stopFlag.get()) { + if (finished.get()) { + // 已经被成功打断 + destroy(); + return; + } final Thread workerThread = executeThread.get(); if (!finished.get() && workerThread != null) { // 未能成功打断任务,强制停止 try { if (tryForceStopThread(workerThread)) { finished.set(true); + taskEndTime = System.currentTimeMillis(); + result = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_FORCE_STOP); log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName()); // 被终止的任务不需要上报状态 destroy(); @@ -308,20 +321,15 @@ public class LightTaskTracker extends TaskTracker { return; } // 首次判断超时 - if (!timeoutFlag) { + if (timeoutFlag.compareAndSet(false, true)) { // 超时,仅尝试打断任务 log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},currentTime:{},runningTimeLimit:{}, try to interrupt it.", instanceId, taskStartTime, System.currentTimeMillis(), instanceInfo.getInstanceTimeoutMS()); processFuture.cancel(true); - timeoutFlag = true; return; } if (finished.get()) { // 已经成功被打断 log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},endTime:{}, interrupt success.", instanceId, taskStartTime, taskEndTime); - if (result == null) { - taskEndTime = System.currentTimeMillis(); - result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED); - } return; } Thread workerThread = executeThread.get();