finished step1: ProcessorTracker receive task and execute

This commit is contained in:
tjq 2020-03-24 20:51:13 +08:00
parent a6482edd38
commit c4a06b52e7
5 changed files with 101 additions and 63 deletions

View File

@ -40,73 +40,81 @@ public class ProcessorRunnable implements Runnable {
@Override
public void run() {
// 0. 创建回复
ProcessorReportTaskStatusReq reportStatus = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(request, reportStatus);
log.debug("[ProcessorRunnable] start to run task(instanceId={}&taskId={}&taskName={})", request.getInstanceId(), request.getTaskId(), request.getTaskName());
// 1. 获取 Processor
BasicProcessor processor = getProcessor();
if (processor == null) {
reportStatus.setStatus(TaskStatus.PROCESS_FAILED.getValue());
reportStatus.setResult("NO_PROCESSOR");
taskTrackerActor.tell(reportStatus, null);
return;
}
// 2. 根任务特殊处理
ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType());
if (TaskConstant.ROOT_TASK_ID.equals(request.getTaskId())) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
BeanUtils.copyProperties(request, reportStatus);
try {
ProcessResult processResult = broadcastProcessor.preProcess();
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(processResult.getMsg());
}catch (Exception e) {
log.warn("[ProcessorRunnable] broadcast task(jobId={}) preProcess failed.", request.getJobId(), e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
}
taskTrackerActor.tell(spReq, null);
}
}
// 3. 通知 TaskTracker 任务开始运行
reportStatus.setStatus(TaskStatus.PROCESSING.getValue());
taskTrackerActor.tell(reportStatus, null);
// 4. 完成提交前准备工作
ProcessResult processResult;
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(request, taskContext);
taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
// 5. 正式提交运行
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(request, reportReq);
try {
processResult = processor.process(taskContext);
reportReq.setResult(processResult.getMsg());
if (processResult.isSuccess()) {
reportReq.setStatus(TaskStatus.PROCESS_SUCCESS.getValue());
}else {
// 0. 创建回复
ProcessorReportTaskStatusReq reportStatus = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(request, reportStatus);
// 1. 获取 Processor
BasicProcessor processor = getProcessor();
if (processor == null) {
reportStatus.setStatus(TaskStatus.PROCESS_FAILED.getValue());
reportStatus.setResult("NO_PROCESSOR");
taskTrackerActor.tell(reportStatus, null);
return;
}
// 2. 根任务特殊处理
ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType());
if (TaskConstant.ROOT_TASK_ID.equals(request.getTaskId())) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
BeanUtils.copyProperties(request, reportStatus);
try {
ProcessResult processResult = broadcastProcessor.preProcess();
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(processResult.getMsg());
}catch (Exception e) {
log.warn("[ProcessorRunnable] broadcast task(jobId={}) preProcess failed.", request.getJobId(), e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
}
taskTrackerActor.tell(spReq, null);
}
}
// 3. 通知 TaskTracker 任务开始运行
reportStatus.setStatus(TaskStatus.PROCESSING.getValue());
taskTrackerActor.tell(reportStatus, null);
// 4. 完成提交前准备工作
ProcessResult processResult;
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(request, taskContext);
if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) {
taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
}
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
// 5. 正式提交运行
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(request, reportReq);
try {
processResult = processor.process(taskContext);
reportReq.setResult(processResult.getMsg());
if (processResult.isSuccess()) {
reportReq.setStatus(TaskStatus.PROCESS_SUCCESS.getValue());
}else {
reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
}
}catch (Exception e) {
log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e);
reportReq.setResult(e.toString());
reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
}
taskTrackerActor.tell(reportReq, null);
}catch (Exception e) {
log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e);
reportReq.setResult(e.toString());
reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
log.error("[ProcessorRunnable] execute failed, please fix this bug!", e);
}
taskTrackerActor.tell(reportReq, null);
}
private BasicProcessor getProcessor() {

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.persistence;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.Connection;

View File

@ -1,6 +1,5 @@
package com.github.kfcfans.oms.worker.persistence;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;

View File

@ -5,12 +5,15 @@
<!-- ConsoleAppender把日志输出到控制台 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d [%t] %-5level %logger{36}.%M\(%file:%line\) - %msg%n</pattern>
<pattern>222222%d [%t] %-5level %logger{36}.%M\(%file:%line\) - %msg%n</pattern>
<!-- 控制台也要使用UTF-8不要使用GBK否则会中文乱码 -->
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="com.zaxxer.hikari" level="ERROR">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="com.github.kfcfans.oms" level="INFO" additivity="false">
<appender-ref ref="STDOUT"/>

View File

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,%i索引【从数字0开始递增】,,, -->
<!-- appender是configuration的子节点是负责写日志的组件。 -->
<!-- ConsoleAppender把日志输出到控制台 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %green([%thread]) - %cyan(%msg%n)</pattern>
<!-- 控制台也要使用UTF-8不要使用GBK否则会中文乱码 -->
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="com.zaxxer.hikari" level="ERROR">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="com.github.kfcfans.oms" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
<!-- 控制台输出日志级别 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>