[opt] optimized code for ProcessorRunnable

This commit is contained in:
tjq 2020-06-16 17:47:17 +08:00
parent 40682bbd34
commit 694cb0ab3c
7 changed files with 36 additions and 85 deletions

View File

@ -9,5 +9,5 @@ ${AnsiColor.GREEN}
░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░
${AnsiColor.BRIGHT_RED}
* Maintainer: tengjiqi@gmail.com
* SourceCode: https://github.com/KFCFans/OhMyScheduler
* SourceCode: https://github.com/KFCFans/PowerJob
* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4)

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
@ -9,7 +10,6 @@ import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.powerjob.common.response.AskResponse;
@ -36,7 +36,6 @@ public class TaskTrackerActor extends AbstractActor {
.match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq)
.match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest)
.match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq)
.match(BroadcastTaskPreExecuteFinishedReq.class, this::onReceiveBroadcastTaskPreExecuteFinishedReq)
.match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq)
.match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq)
.matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj))
@ -61,9 +60,14 @@ public class TaskTrackerActor extends AbstractActor {
// 手动停止 TaskTracker 的情况下会出现这种情况
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
} else {
taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
return;
}
if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) {
taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
}
taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
}
/**
@ -104,19 +108,6 @@ public class TaskTrackerActor extends AbstractActor {
getSender().tell(response, getSelf());
}
/**
* 广播任务前置任务执行完毕 处理器
*/
private void onReceiveBroadcastTaskPreExecuteFinishedReq(BroadcastTaskPreExecuteFinishedReq req) {
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive BroadcastTaskPreExecuteFinishedReq({}) but system can't find TaskTracker.", req);
return;
}
taskTracker.broadcast(req.isSuccess(), req.getSubInstanceId(), req.getTaskId(), req.getReportTime(), req.getMsg());
}
/**
* 服务器任务调度处理器
*/

View File

@ -1,10 +1,7 @@
package com.github.kfcfans.powerjob.worker.core.executor;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
@ -16,7 +13,6 @@ import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
@ -29,11 +25,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* Processor 执行器
@ -78,40 +71,31 @@ public class ProcessorRunnable implements Runnable {
taskContext.setUserContext(OhMyWorker.getConfig().getUserContext());
ThreadLocalStore.setTask(task);
reportStatus(TaskStatus.WORKER_PROCESSING, null);
reportStatus(TaskStatus.WORKER_PROCESSING, null, null);
// 1. 根任务特殊处理
ProcessResult processResult;
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
spReq.setTaskId(taskId);
spReq.setInstanceId(instanceId);
spReq.setSubInstanceId(task.getSubInstanceId());
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
try {
ProcessResult processResult = broadcastProcessor.preProcess(taskContext);
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(suit(processResult.getMsg()));
processResult = broadcastProcessor.preProcess(taskContext);
}catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
processResult = new ProcessResult(false, e.toString());
}
}else {
spReq.setSuccess(true);
spReq.setMsg("NO_PREPOST_TASK");
processResult = new ProcessResult(true, "NO_PREPOST_TASK");
}
spReq.setReportTime(System.currentTimeMillis());
taskTrackerActor.tell(spReq, null);
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST);
// 广播执行的第一个 task 只执行 preProcess 部分
return;
}
@ -123,7 +107,6 @@ public class ProcessorRunnable implements Runnable {
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
ProcessResult lastResult;
List<TaskResult> taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId());
try {
switch (executeType) {
@ -131,30 +114,30 @@ public class ProcessorRunnable implements Runnable {
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
lastResult = broadcastProcessor.postProcess(taskContext, taskResults);
processResult = broadcastProcessor.postProcess(taskContext, taskResults);
}else {
lastResult = BroadcastProcessor.defaultResult(taskResults);
processResult = BroadcastProcessor.defaultResult(taskResults);
}
break;
case MAP_REDUCE:
if (processor instanceof MapReduceProcessor) {
MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
lastResult = mapReduceProcessor.reduce(taskContext, taskResults);
processResult = mapReduceProcessor.reduce(taskContext, taskResults);
}else {
lastResult = new ProcessResult(false, "not implement the MapReduceProcessor");
processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
}
break;
default:
lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
}
}catch (Throwable e) {
lastResult = new ProcessResult(false, e.toString());
processResult = new ProcessResult(false, e.toString());
log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
}
TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, suit(lastResult.getMsg()));
TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, suit(processResult.getMsg()), null);
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
return;
@ -162,27 +145,28 @@ public class ProcessorRunnable implements Runnable {
// 3. 正式提交运行
ProcessResult processResult;
try {
processResult = processor.process(taskContext);
}catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
processResult = new ProcessResult(false, e.toString());
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()));
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null);
}
/**
* 上报状态给 TaskTracker
*/
private void reportStatus(TaskStatus status, String result) {
private void reportStatus(TaskStatus status, String result, Integer cmd) {
ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq();
req.setInstanceId(task.getInstanceId());
req.setSubInstanceId(task.getSubInstanceId());
req.setTaskId(task.getTaskId());
req.setStatus(status.getValue());
req.setResult(result);
req.setReportTime(System.currentTimeMillis());
req.setCmd(cmd);
// 最终结束状态要求可靠发送
if (TaskStatus.finishedStatus.contains(status.getValue())) {
@ -205,7 +189,7 @@ public class ProcessorRunnable implements Runnable {
innerRun();
}catch (InterruptedException ignore) {
}catch (Throwable e) {
reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString());
reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null);
log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e);
}finally {
ThreadLocalStore.clear();

View File

@ -113,7 +113,7 @@ public class ProcessorTracker {
// 一旦 ProcessorTracker 出现异常所有提交到此处的任务直接返回失败防止形成死锁
// 死锁分析TT创建PTPT创建失败无法定期汇报心跳TT长时间未收到PT心跳认为PT宕机确实宕机了无法选择可用的PT再次派发任务死锁形成GG斯密达 T_T
if (lethal) {
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis());
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getSubInstanceId(), newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis(), null);
taskTrackerActorRef.tell(report, null);
return;
}
@ -139,6 +139,7 @@ public class ProcessorTracker {
if (success) {
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
reportReq.setInstanceId(instanceId);
reportReq.setSubInstanceId(newTask.getSubInstanceId());
reportReq.setTaskId(newTask.getTaskId());
reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
reportReq.setReportTime(System.currentTimeMillis());

View File

@ -254,10 +254,9 @@ public abstract class TaskTracker {
* @param preExecuteSuccess 预执行广播任务运行状态
* @param subInstanceId 子实例ID
* @param preTaskId 预执行广播任务的taskId
* @param reportTime 上报时间
* @param result 预执行广播任务的结果
*/
public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, long reportTime, String result) {
public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) {
if (finished.get()) {
return;
@ -280,10 +279,6 @@ public abstract class TaskTracker {
}else {
log.debug("[TaskTracker-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, result);
}
// 2. 更新根任务状态广播任务的根任务为 preProcess 任务
int status = preExecuteSuccess ? TaskStatus.WORKER_PROCESS_SUCCESS.getValue() : TaskStatus.WORKER_PROCESS_FAILED.getValue();
updateTaskStatus(preTaskId, status, reportTime, result);
}
/**

View File

@ -1,25 +0,0 @@
package com.github.kfcfans.powerjob.worker.pojo.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.Data;
/**
* 广播任务 preExecute 结束信息
*
* @author tjq
* @since 2020/3/23
*/
@Data
public class BroadcastTaskPreExecuteFinishedReq implements OmsSerializable {
private Long instanceId;
private Long subInstanceId;
private String taskId;
private boolean success;
private String msg;
// 上报时间
private long reportTime;
}

View File

@ -17,7 +17,10 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class ProcessorReportTaskStatusReq implements OmsSerializable {
public static final Integer BROADCAST = 1;
private Long instanceId;
private Long subInstanceId;
private String taskId;
private int status;
@ -29,4 +32,6 @@ public class ProcessorReportTaskStatusReq implements OmsSerializable {
// 上报时间
private long reportTime;
// 特殊请求名称
private Integer cmd;
}