diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index ee105c3f..f3546db1 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -61,7 +61,7 @@ public abstract class TaskTracker { protected static final int MAX_REPORT_FAILED_THRESHOLD = 5; - public TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { + protected TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { this.createTime = System.currentTimeMillis(); this.workerRuntime = workerRuntime; this.instanceId = req.getInstanceId(); @@ -112,7 +112,7 @@ public abstract class TaskTracker { serverActor.tell(response, null); } - protected void reportFinalStatusThenDestory(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) { + protected void reportFinalStatusThenDestroy(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) { // 最终状态需要可靠上报 CompletionStage ask = Patterns.ask(serverActor, reportInstanceStatusReq, Duration.ofSeconds(15)); boolean serverAccepted = false; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index e8bb2fd7..ef9c2e0a 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -234,35 +234,11 @@ public class CommonTaskTracker extends HeavyTaskTracker { // 4. 执行完毕,报告服务器 if (finished.get()) { - req.setResult(result); // 上报追加的工作流上下文信息 req.setAppendedWfContext(appendedWfContext); req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); - - CompletionStage askCS = Patterns.ask(serverActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - - boolean serverAccepted = false; - try { - AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - serverAccepted = askResponse.isSuccess(); - } catch (Exception e) { - log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result, e); - } - - // 服务器未接受上报,则等待下次重新上报 - if (!serverAccepted) { - if (++reportFailedCnt > MAX_REPORT_FAILED_THRESHOLD) { - log.error("[TaskTracker-{}] try to report finished status(success={}, result={}) lots of times but all failed, it's time to give up, so the process result will be dropped", instanceId, success, result); - destroy(); - } - return; - } - - // 服务器已经更新状态,任务已经执行完毕,开始释放所有资源 - log.info("[TaskTracker-{}] instance process finished,result = {}, start to release resource...", instanceId, result); - - destroy(); + reportFinalStatusThenDestroy(serverActor,req); return; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index 72c22a5b..011c8ee5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -59,10 +59,6 @@ public class LightTaskTracker extends TaskTracker { * 任务状态 */ private TaskStatus status; - /** - * 创建时间 - */ - private final Long createTime; /** * 任务开始执行的时间 */ @@ -80,12 +76,13 @@ public class LightTaskTracker extends TaskTracker { protected final AtomicBoolean stopFlag = new AtomicBoolean(false); + protected final AtomicBoolean destroyFlag = new AtomicBoolean(false); + public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { super(req, workerRuntime); try { taskContext = constructTaskContext(req, workerRuntime); - createTime = System.currentTimeMillis(); // 等待处理 status = TaskStatus.WORKER_RECEIVED; // 加载 Processor @@ -131,6 +128,10 @@ public class LightTaskTracker extends TaskTracker { @Override public void destroy() { + if (!destroyFlag.compareAndSet(false, true)) { + log.warn("[TaskTracker-{}] This TaskTracker has been destroyed!", instanceId); + return; + } if (statusReportScheduledFuture != null) { statusReportScheduledFuture.cancel(true); } @@ -153,11 +154,10 @@ public class LightTaskTracker extends TaskTracker { log.warn("[TaskTracker-{}] fail to stop task,task is finished!result:{}", instanceId, result); return; } - if (stopFlag.get()) { + if (!stopFlag.compareAndSet(false, true)) { log.warn("[TaskTracker-{}] task has been mark as stopped,ignore this request!", instanceId); return; } - stopFlag.set(true); // 当前任务尚未执行 if (status == TaskStatus.WORKER_RECEIVED) { log.warn("[TaskTracker-{}] task is not started,destroy this taskTracker directly!", instanceId); @@ -206,6 +206,7 @@ public class LightTaskTracker extends TaskTracker { res = processorInfo.getBasicProcessor().process(taskContext); } catch (InterruptedException e) { log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e); + Thread.currentThread().interrupt(); res = new ProcessResult(false, e.toString()); } catch (Exception e) { log.warn("[TaskTracker-{}] process failed !", instanceId, e); @@ -236,14 +237,12 @@ public class LightTaskTracker extends TaskTracker { } - /* - TODO:建议添加 锁 + 销毁判断,2种极限场景: - 1. 先 processTask 调用,同时定时任务再次触发:理论上定时任务的处理栈上会抛出 InterruptException(虽然无实际功能的影响,但不够优雅,而且总归是问题) - 2. 先定时任务触发,同时 processTask 调用:这种情况好像没太大的问题,重复上报2次 - - 总而言之建议考虑理论情况下的"并行"场景,价格锁和入口状态判断增加代码健壮性 - */ - private void checkAndReportStatus() { + private synchronized void checkAndReportStatus() { + if (destroyFlag.get()) { + // 已经被销毁,不需要上报状态 + log.info("[TaskTracker-{}] has been destroyed,final status is {},needn't to report status!", instanceId, status); + return; + } String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath); TaskTrackerReportInstanceStatusReq reportInstanceStatusReq = new TaskTrackerReportInstanceStatusReq(); @@ -268,6 +267,7 @@ public class LightTaskTracker extends TaskTracker { log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName()); // 被终止的任务不需要上报状态 destroy(); + return; } } catch (Exception e) { log.warn("[TaskTracker-{}] task need stop,fail to stop thread {}", instanceId, workerThread.getName(), e); @@ -290,7 +290,7 @@ public class LightTaskTracker extends TaskTracker { reportInstanceStatusReq.setEndTime(taskEndTime); // 微操一下,上报最终状态时重新设置下时间,并且增加一小段偏移,保证在并发上报运行中状态以及最终状态时,最终状态的上报时间晚于运行中的状态 reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1); - reportFinalStatusThenDestory(serverActor, reportInstanceStatusReq); + reportFinalStatusThenDestroy(serverActor, reportInstanceStatusReq); return; } // 未完成的任务,只需要上报状态 @@ -310,7 +310,7 @@ public class LightTaskTracker extends TaskTracker { // 首次判断超时 if (!timeoutFlag) { // 超时,仅尝试打断任务 - log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},currentTime:{},runningTimeLimit:{}, try to interrupt it.", instanceId, taskStartTime, System.currentTimeMillis(),instanceInfo.getInstanceTimeoutMS()); + log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},currentTime:{},runningTimeLimit:{}, try to interrupt it.", instanceId, taskStartTime, System.currentTimeMillis(), instanceInfo.getInstanceTimeoutMS()); processFuture.cancel(true); timeoutFlag = true; return; @@ -342,20 +342,20 @@ public class LightTaskTracker extends TaskTracker { } private TaskContext constructTaskContext(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { - final TaskContext taskContext = new TaskContext(); - taskContext.setTaskId(req.getJobId() + "#" + req.getInstanceId()); - taskContext.setJobId(req.getJobId()); - taskContext.setJobParams(req.getJobParams()); - taskContext.setInstanceId(req.getInstanceId()); - taskContext.setInstanceParams(req.getInstanceParams()); - taskContext.setWorkflowContext(new WorkflowContext(req.getWfInstanceId(), req.getInstanceParams())); - taskContext.setOmsLogger(OmsLoggerFactory.build(req.getInstanceId(), req.getLogConfig(), workerRuntime)); - taskContext.setTaskName(TaskConstant.ROOT_TASK_NAME); - taskContext.setMaxRetryTimes(req.getTaskRetryNum()); - taskContext.setCurrentRetryTimes(0); - taskContext.setUserContext(workerRuntime.getWorkerConfig().getUserContext()); + final TaskContext context = new TaskContext(); + context.setTaskId(req.getJobId() + "#" + req.getInstanceId()); + context.setJobId(req.getJobId()); + context.setJobParams(req.getJobParams()); + context.setInstanceId(req.getInstanceId()); + context.setInstanceParams(req.getInstanceParams()); + context.setWorkflowContext(new WorkflowContext(req.getWfInstanceId(), req.getInstanceParams())); + context.setOmsLogger(OmsLoggerFactory.build(req.getInstanceId(), req.getLogConfig(), workerRuntime)); + context.setTaskName(TaskConstant.ROOT_TASK_NAME); + context.setMaxRetryTimes(req.getTaskRetryNum()); + context.setCurrentRetryTimes(0); + context.setUserContext(workerRuntime.getWorkerConfig().getUserContext()); // 轻量级任务不会涉及到任务分片的处理,不需要处理子任务相关的信息 - return taskContext; + return context; } private String suit(String result) { @@ -373,6 +373,7 @@ public class LightTaskTracker extends TaskTracker { /** * try force stop thread + * * @param thread thread * @return stop result */