feat: optimize the code of TaskTracker

This commit is contained in:
Echo009 2023-01-04 22:43:23 +08:00
parent 9f6d421ed2
commit fe03b8faab
3 changed files with 34 additions and 57 deletions

View File

@ -61,7 +61,7 @@ public abstract class TaskTracker {
protected static final int MAX_REPORT_FAILED_THRESHOLD = 5;
public TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
protected TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
this.createTime = System.currentTimeMillis();
this.workerRuntime = workerRuntime;
this.instanceId = req.getInstanceId();
@ -112,7 +112,7 @@ public abstract class TaskTracker {
serverActor.tell(response, null);
}
protected void reportFinalStatusThenDestory(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) {
protected void reportFinalStatusThenDestroy(ActorSelection serverActor, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) {
// 最终状态需要可靠上报
CompletionStage<Object> ask = Patterns.ask(serverActor, reportInstanceStatusReq, Duration.ofSeconds(15));
boolean serverAccepted = false;

View File

@ -234,35 +234,11 @@ public class CommonTaskTracker extends HeavyTaskTracker {
// 4. 执行完毕报告服务器
if (finished.get()) {
req.setResult(result);
// 上报追加的工作流上下文信息
req.setAppendedWfContext(appendedWfContext);
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
boolean serverAccepted = false;
try {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverAccepted = askResponse.isSuccess();
} catch (Exception e) {
log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result, e);
}
// 服务器未接受上报则等待下次重新上报
if (!serverAccepted) {
if (++reportFailedCnt > MAX_REPORT_FAILED_THRESHOLD) {
log.error("[TaskTracker-{}] try to report finished status(success={}, result={}) lots of times but all failed, it's time to give up, so the process result will be dropped", instanceId, success, result);
destroy();
}
return;
}
// 服务器已经更新状态任务已经执行完毕开始释放所有资源
log.info("[TaskTracker-{}] instance process finished,result = {}, start to release resource...", instanceId, result);
destroy();
reportFinalStatusThenDestroy(serverActor,req);
return;
}

View File

@ -59,10 +59,6 @@ public class LightTaskTracker extends TaskTracker {
* 任务状态
*/
private TaskStatus status;
/**
* 创建时间
*/
private final Long createTime;
/**
* 任务开始执行的时间
*/
@ -80,12 +76,13 @@ public class LightTaskTracker extends TaskTracker {
protected final AtomicBoolean stopFlag = new AtomicBoolean(false);
protected final AtomicBoolean destroyFlag = new AtomicBoolean(false);
public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
super(req, workerRuntime);
try {
taskContext = constructTaskContext(req, workerRuntime);
createTime = System.currentTimeMillis();
// 等待处理
status = TaskStatus.WORKER_RECEIVED;
// 加载 Processor
@ -131,6 +128,10 @@ public class LightTaskTracker extends TaskTracker {
@Override
public void destroy() {
if (!destroyFlag.compareAndSet(false, true)) {
log.warn("[TaskTracker-{}] This TaskTracker has been destroyed!", instanceId);
return;
}
if (statusReportScheduledFuture != null) {
statusReportScheduledFuture.cancel(true);
}
@ -153,11 +154,10 @@ public class LightTaskTracker extends TaskTracker {
log.warn("[TaskTracker-{}] fail to stop task,task is finished!result:{}", instanceId, result);
return;
}
if (stopFlag.get()) {
if (!stopFlag.compareAndSet(false, true)) {
log.warn("[TaskTracker-{}] task has been mark as stopped,ignore this request!", instanceId);
return;
}
stopFlag.set(true);
// 当前任务尚未执行
if (status == TaskStatus.WORKER_RECEIVED) {
log.warn("[TaskTracker-{}] task is not started,destroy this taskTracker directly!", instanceId);
@ -206,6 +206,7 @@ public class LightTaskTracker extends TaskTracker {
res = processorInfo.getBasicProcessor().process(taskContext);
} catch (InterruptedException e) {
log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e);
Thread.currentThread().interrupt();
res = new ProcessResult(false, e.toString());
} catch (Exception e) {
log.warn("[TaskTracker-{}] process failed !", instanceId, e);
@ -236,14 +237,12 @@ public class LightTaskTracker extends TaskTracker {
}
/*
TODO建议添加 + 销毁判断2种极限场景
1. processTask 调用同时定时任务再次触发理论上定时任务的处理栈上会抛出 InterruptException虽然无实际功能的影响但不够优雅而且总归是问题
2. 先定时任务触发同时 processTask 调用这种情况好像没太大的问题重复上报2次
总而言之建议考虑理论情况下的"并行"场景价格锁和入口状态判断增加代码健壮性
*/
private void checkAndReportStatus() {
private synchronized void checkAndReportStatus() {
if (destroyFlag.get()) {
// 已经被销毁不需要上报状态
log.info("[TaskTracker-{}] has been destroyed,final status is {},needn't to report status!", instanceId, status);
return;
}
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);
TaskTrackerReportInstanceStatusReq reportInstanceStatusReq = new TaskTrackerReportInstanceStatusReq();
@ -268,6 +267,7 @@ public class LightTaskTracker extends TaskTracker {
log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName());
// 被终止的任务不需要上报状态
destroy();
return;
}
} catch (Exception e) {
log.warn("[TaskTracker-{}] task need stop,fail to stop thread {}", instanceId, workerThread.getName(), e);
@ -290,7 +290,7 @@ public class LightTaskTracker extends TaskTracker {
reportInstanceStatusReq.setEndTime(taskEndTime);
// 微操一下上报最终状态时重新设置下时间并且增加一小段偏移保证在并发上报运行中状态以及最终状态时最终状态的上报时间晚于运行中的状态
reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1);
reportFinalStatusThenDestory(serverActor, reportInstanceStatusReq);
reportFinalStatusThenDestroy(serverActor, reportInstanceStatusReq);
return;
}
// 未完成的任务只需要上报状态
@ -342,20 +342,20 @@ public class LightTaskTracker extends TaskTracker {
}
private TaskContext constructTaskContext(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
final TaskContext taskContext = new TaskContext();
taskContext.setTaskId(req.getJobId() + "#" + req.getInstanceId());
taskContext.setJobId(req.getJobId());
taskContext.setJobParams(req.getJobParams());
taskContext.setInstanceId(req.getInstanceId());
taskContext.setInstanceParams(req.getInstanceParams());
taskContext.setWorkflowContext(new WorkflowContext(req.getWfInstanceId(), req.getInstanceParams()));
taskContext.setOmsLogger(OmsLoggerFactory.build(req.getInstanceId(), req.getLogConfig(), workerRuntime));
taskContext.setTaskName(TaskConstant.ROOT_TASK_NAME);
taskContext.setMaxRetryTimes(req.getTaskRetryNum());
taskContext.setCurrentRetryTimes(0);
taskContext.setUserContext(workerRuntime.getWorkerConfig().getUserContext());
final TaskContext context = new TaskContext();
context.setTaskId(req.getJobId() + "#" + req.getInstanceId());
context.setJobId(req.getJobId());
context.setJobParams(req.getJobParams());
context.setInstanceId(req.getInstanceId());
context.setInstanceParams(req.getInstanceParams());
context.setWorkflowContext(new WorkflowContext(req.getWfInstanceId(), req.getInstanceParams()));
context.setOmsLogger(OmsLoggerFactory.build(req.getInstanceId(), req.getLogConfig(), workerRuntime));
context.setTaskName(TaskConstant.ROOT_TASK_NAME);
context.setMaxRetryTimes(req.getTaskRetryNum());
context.setCurrentRetryTimes(0);
context.setUserContext(workerRuntime.getWorkerConfig().getUserContext());
// 轻量级任务不会涉及到任务分片的处理不需要处理子任务相关的信息
return taskContext;
return context;
}
private String suit(String result) {
@ -373,6 +373,7 @@ public class LightTaskTracker extends TaskTracker {
/**
* try force stop thread
*
* @param thread thread
* @return stop result
*/