diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java index a7dfdb9f..5e1cf0da 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java @@ -24,6 +24,12 @@ public class PowerJobDKey { public static final String IGNORED_NETWORK_INTERFACE_REGEX = "powerjob.network.interface.ignored"; public static final String WORKER_STATUS_CHECK_PERIOD = "powerjob.worker.status-check.normal.period"; + + /** + * allowed PowerJob to invoke Thread#stop to kill a thread when PowerJob can't interrupt the thread + * It's VERY dangerous + */ + public static final String WORKER_ALLOWED_FORCE_STOP_THREAD = "powerjob.worker.allowed-force-stop-thread"; /** * ms */ diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index 899c8265..ee105c3f 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -5,7 +5,6 @@ import akka.pattern.Patterns; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; -import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; @@ -113,7 +112,7 @@ public abstract class TaskTracker { serverActor.tell(response, null); } - protected void reportFinalStatus(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) { + protected void reportFinalStatusThenDestory(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) { // 最终状态需要可靠上报 CompletionStage ask = Patterns.ask(serverActor, reportInstanceStatusReq, Duration.ofSeconds(15)); boolean serverAccepted = false; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index 182135c1..72c22a5b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -236,6 +236,13 @@ public class LightTaskTracker extends TaskTracker { } + /* + TODO:建议添加 锁 + 销毁判断,2种极限场景: + 1. 先 processTask 调用,同时定时任务再次触发:理论上定时任务的处理栈上会抛出 InterruptException(虽然无实际功能的影响,但不够优雅,而且总归是问题) + 2. 先定时任务触发,同时 processTask 调用:这种情况好像没太大的问题,重复上报2次 + + 总而言之建议考虑理论情况下的"并行"场景,价格锁和入口状态判断增加代码健壮性 + */ private void checkAndReportStatus() { String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath); @@ -255,13 +262,13 @@ public class LightTaskTracker extends TaskTracker { final Thread workerThread = executeThread.get(); if (!finished.get() && workerThread != null) { // 未能成功打断任务,强制停止 - log.warn("[TaskTracker-{}] task need stop,but fail to interrupt it,force stop thread {}", instanceId, executeThread.get().getName()); try { - workerThread.stop(); - finished.set(true); - log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName()); - // 被终止的任务不需要上报状态 - destroy(); + if (tryForceStopThread(workerThread)) { + finished.set(true); + log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName()); + // 被终止的任务不需要上报状态 + destroy(); + } } catch (Exception e) { log.warn("[TaskTracker-{}] task need stop,fail to stop thread {}", instanceId, workerThread.getName(), e); } @@ -283,7 +290,7 @@ public class LightTaskTracker extends TaskTracker { reportInstanceStatusReq.setEndTime(taskEndTime); // 微操一下,上报最终状态时重新设置下时间,并且增加一小段偏移,保证在并发上报运行中状态以及最终状态时,最终状态的上报时间晚于运行中的状态 reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1); - reportFinalStatus(serverActor, reportInstanceStatusReq); + reportFinalStatusThenDestory(serverActor, reportInstanceStatusReq); return; } // 未完成的任务,只需要上报状态 @@ -322,13 +329,13 @@ public class LightTaskTracker extends TaskTracker { return; } // 未能成功打断任务,强制终止 - log.warn("[TaskTracker-{}] task timeout,but fail to interrupt it,force stop thread {}", instanceId, workerThread.getName()); try { - workerThread.stop(); - finished.set(true); - taskEndTime = System.currentTimeMillis(); - result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP); - log.warn("[TaskTracker-{}] task timeout, force stop thread {} success!", instanceId, workerThread.getName()); + if (tryForceStopThread(workerThread)) { + finished.set(true); + taskEndTime = System.currentTimeMillis(); + result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP); + log.warn("[TaskTracker-{}] task timeout, force stop thread {} success!", instanceId, workerThread.getName()); + } } catch (Exception e) { log.warn("[TaskTracker-{}] task timeout,fail to stop thread {}", instanceId, workerThread.getName(), e); } @@ -364,4 +371,29 @@ public class LightTaskTracker extends TaskTracker { return result.substring(0, maxLength).concat("..."); } + /** + * try force stop thread + * @param thread thread + * @return stop result + */ + private boolean tryForceStopThread(Thread thread) { + + String threadName = thread.getName(); + + String allowStopThread = System.getProperty(PowerJobDKey.WORKER_ALLOWED_FORCE_STOP_THREAD); + if (!StringUtils.equalsIgnoreCase(allowStopThread, Boolean.TRUE.toString())) { + log.warn("[TaskTracker-{}] PowerJob not allowed to force stop a thread by config", instanceId); + return false; + } + + log.warn("[TaskTracker-{}] fail to interrupt the thread[{}], try to force stop.", instanceId, threadName); + try { + thread.stop(); + return true; + } catch (Throwable t) { + log.warn("[TaskTracker-{}] stop thread[{}] failed, msg: {}", instanceId, threadName, t.getMessage()); + } + return false; + } + }