diff --git a/powerjob-server/src/main/resources/banner.txt b/powerjob-server/src/main/resources/banner.txt index e0e1d9e7..dffacc32 100644 --- a/powerjob-server/src/main/resources/banner.txt +++ b/powerjob-server/src/main/resources/banner.txt @@ -9,5 +9,5 @@ ${AnsiColor.GREEN} ░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ ${AnsiColor.BRIGHT_RED} * Maintainer: tengjiqi@gmail.com -* SourceCode: https://github.com/KFCFans/OhMyScheduler +* SourceCode: https://github.com/KFCFans/PowerJob * PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java index a1f2f647..8ae65fcc 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.worker.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; @@ -9,7 +10,6 @@ import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; -import com.github.kfcfans.powerjob.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorMapTaskRequest; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.powerjob.common.response.AskResponse; @@ -36,7 +36,6 @@ public class TaskTrackerActor extends AbstractActor { .match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq) .match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest) .match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq) - .match(BroadcastTaskPreExecuteFinishedReq.class, this::onReceiveBroadcastTaskPreExecuteFinishedReq) .match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq) .match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq) .matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj)) @@ -61,9 +60,14 @@ public class TaskTrackerActor extends AbstractActor { // 手动停止 TaskTracker 的情况下会出现这种情况 if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); - } else { - taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); + return; } + + if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) { + taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult()); + } + + taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); } /** @@ -104,19 +108,6 @@ public class TaskTrackerActor extends AbstractActor { getSender().tell(response, getSelf()); } - /** - * 广播任务前置任务执行完毕 处理器 - */ - private void onReceiveBroadcastTaskPreExecuteFinishedReq(BroadcastTaskPreExecuteFinishedReq req) { - - TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); - if (taskTracker == null) { - log.warn("[TaskTrackerActor] receive BroadcastTaskPreExecuteFinishedReq({}) but system can't find TaskTracker.", req); - return; - } - taskTracker.broadcast(req.isSuccess(), req.getSubInstanceId(), req.getTaskId(), req.getReportTime(), req.getMsg()); - } - /** * 服务器任务调度处理器 */ diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java index 1c6a9090..8ef445ff 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java @@ -1,10 +1,7 @@ package com.github.kfcfans.powerjob.worker.core.executor; import akka.actor.ActorSelection; -import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.ExecuteType; -import com.github.kfcfans.powerjob.common.RemoteConstant; -import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; @@ -16,7 +13,6 @@ import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo; -import com.github.kfcfans.powerjob.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; @@ -29,11 +25,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; -import java.time.Duration; import java.util.List; import java.util.Queue; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; /** * Processor 执行器 @@ -78,40 +71,31 @@ public class ProcessorRunnable implements Runnable { taskContext.setUserContext(OhMyWorker.getConfig().getUserContext()); ThreadLocalStore.setTask(task); - reportStatus(TaskStatus.WORKER_PROCESSING, null); + reportStatus(TaskStatus.WORKER_PROCESSING, null, null); // 1. 根任务特殊处理 + ProcessResult processResult; ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) { // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task if (executeType == ExecuteType.BROADCAST) { - BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq(); - spReq.setTaskId(taskId); - spReq.setInstanceId(instanceId); - spReq.setSubInstanceId(task.getSubInstanceId()); - if (processor instanceof BroadcastProcessor) { BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; try { - ProcessResult processResult = broadcastProcessor.preProcess(taskContext); - spReq.setSuccess(processResult.isSuccess()); - spReq.setMsg(suit(processResult.getMsg())); + processResult = broadcastProcessor.preProcess(taskContext); }catch (Throwable e) { log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); - spReq.setSuccess(false); - spReq.setMsg(e.toString()); + processResult = new ProcessResult(false, e.toString()); } }else { - spReq.setSuccess(true); - spReq.setMsg("NO_PREPOST_TASK"); + processResult = new ProcessResult(true, "NO_PREPOST_TASK"); } - spReq.setReportTime(System.currentTimeMillis()); - taskTrackerActor.tell(spReq, null); + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST); // 广播执行的第一个 task 只执行 preProcess 部分 return; } @@ -123,7 +107,6 @@ public class ProcessorRunnable implements Runnable { Stopwatch stopwatch = Stopwatch.createStarted(); log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); - ProcessResult lastResult; List taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId()); try { switch (executeType) { @@ -131,30 +114,30 @@ public class ProcessorRunnable implements Runnable { if (processor instanceof BroadcastProcessor) { BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; - lastResult = broadcastProcessor.postProcess(taskContext, taskResults); + processResult = broadcastProcessor.postProcess(taskContext, taskResults); }else { - lastResult = BroadcastProcessor.defaultResult(taskResults); + processResult = BroadcastProcessor.defaultResult(taskResults); } break; case MAP_REDUCE: if (processor instanceof MapReduceProcessor) { MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor; - lastResult = mapReduceProcessor.reduce(taskContext, taskResults); + processResult = mapReduceProcessor.reduce(taskContext, taskResults); }else { - lastResult = new ProcessResult(false, "not implement the MapReduceProcessor"); + processResult = new ProcessResult(false, "not implement the MapReduceProcessor"); } break; default: - lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG"); + processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG"); } }catch (Throwable e) { - lastResult = new ProcessResult(false, e.toString()); + processResult = new ProcessResult(false, e.toString()); log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e); } - TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; - reportStatus(status, suit(lastResult.getMsg())); + TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; + reportStatus(status, suit(processResult.getMsg()), null); log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); return; @@ -162,27 +145,28 @@ public class ProcessorRunnable implements Runnable { // 3. 正式提交运行 - ProcessResult processResult; try { processResult = processor.process(taskContext); }catch (Throwable e) { log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e); processResult = new ProcessResult(false, e.toString()); } - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg())); + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null); } /** * 上报状态给 TaskTracker */ - private void reportStatus(TaskStatus status, String result) { + private void reportStatus(TaskStatus status, String result, Integer cmd) { ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq(); req.setInstanceId(task.getInstanceId()); + req.setSubInstanceId(task.getSubInstanceId()); req.setTaskId(task.getTaskId()); req.setStatus(status.getValue()); req.setResult(result); req.setReportTime(System.currentTimeMillis()); + req.setCmd(cmd); // 最终结束状态要求可靠发送 if (TaskStatus.finishedStatus.contains(status.getValue())) { @@ -205,7 +189,7 @@ public class ProcessorRunnable implements Runnable { innerRun(); }catch (InterruptedException ignore) { }catch (Throwable e) { - reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString()); + reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null); log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e); }finally { ThreadLocalStore.clear(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 4d0cd01b..bba0b3b1 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -113,7 +113,7 @@ public class ProcessorTracker { // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁 // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T if (lethal) { - ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis()); + ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getSubInstanceId(), newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis(), null); taskTrackerActorRef.tell(report, null); return; } @@ -139,6 +139,7 @@ public class ProcessorTracker { if (success) { ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); reportReq.setInstanceId(instanceId); + reportReq.setSubInstanceId(newTask.getSubInstanceId()); reportReq.setTaskId(newTask.getTaskId()); reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); reportReq.setReportTime(System.currentTimeMillis()); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index f57e5811..e156c351 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -254,10 +254,9 @@ public abstract class TaskTracker { * @param preExecuteSuccess 预执行广播任务运行状态 * @param subInstanceId 子实例ID * @param preTaskId 预执行广播任务的taskId - * @param reportTime 上报时间 * @param result 预执行广播任务的结果 */ - public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, long reportTime, String result) { + public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) { if (finished.get()) { return; @@ -280,10 +279,6 @@ public abstract class TaskTracker { }else { log.debug("[TaskTracker-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, result); } - - // 2. 更新根任务状态(广播任务的根任务为 preProcess 任务) - int status = preExecuteSuccess ? TaskStatus.WORKER_PROCESS_SUCCESS.getValue() : TaskStatus.WORKER_PROCESS_FAILED.getValue(); - updateTaskStatus(preTaskId, status, reportTime, result); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java deleted file mode 100644 index c0f671b8..00000000 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.github.kfcfans.powerjob.worker.pojo.request; - -import com.github.kfcfans.powerjob.common.OmsSerializable; -import lombok.Data; - - -/** - * 广播任务 preExecute 结束信息 - * - * @author tjq - * @since 2020/3/23 - */ -@Data -public class BroadcastTaskPreExecuteFinishedReq implements OmsSerializable { - - private Long instanceId; - private Long subInstanceId; - private String taskId; - - private boolean success; - private String msg; - - // 上报时间 - private long reportTime; -} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java index c936f644..9d33b8b4 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java @@ -17,7 +17,10 @@ import lombok.NoArgsConstructor; @AllArgsConstructor public class ProcessorReportTaskStatusReq implements OmsSerializable { + public static final Integer BROADCAST = 1; + private Long instanceId; + private Long subInstanceId; private String taskId; private int status; @@ -29,4 +32,6 @@ public class ProcessorReportTaskStatusReq implements OmsSerializable { // 上报时间 private long reportTime; + // 特殊请求名称 + private Integer cmd; }