From c4a06b52e79c4ab8a524073341d0d10b11c45c32 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 24 Mar 2020 20:51:13 +0800 Subject: [PATCH] finished step1: ProcessorTracker receive task and execute --- .../core/executor/ProcessorRunnable.java | 130 ++++++++++-------- .../worker/persistence/ConnectionFactory.java | 2 + .../oms/worker/persistence/TaskDAOImpl.java | 1 - .../src/main/resources/oms-logback.xml | 5 +- .../src/test/resources/logback-test.xml | 26 ++++ 5 files changed, 101 insertions(+), 63 deletions(-) create mode 100644 oh-my-scheduler-worker/src/test/resources/logback-test.xml diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 1261d304..58a0b865 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -40,73 +40,81 @@ public class ProcessorRunnable implements Runnable { @Override public void run() { - // 0. 创建回复 - ProcessorReportTaskStatusReq reportStatus = new ProcessorReportTaskStatusReq(); - BeanUtils.copyProperties(request, reportStatus); + log.debug("[ProcessorRunnable] start to run task(instanceId={}&taskId={}&taskName={})", request.getInstanceId(), request.getTaskId(), request.getTaskName()); - // 1. 获取 Processor - BasicProcessor processor = getProcessor(); - if (processor == null) { - reportStatus.setStatus(TaskStatus.PROCESS_FAILED.getValue()); - reportStatus.setResult("NO_PROCESSOR"); - taskTrackerActor.tell(reportStatus, null); - return; - } - - // 2. 根任务特殊处理 - ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType()); - if (TaskConstant.ROOT_TASK_ID.equals(request.getTaskId())) { - - // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task - if (executeType == ExecuteType.BROADCAST) { - - BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; - BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq(); - BeanUtils.copyProperties(request, reportStatus); - try { - ProcessResult processResult = broadcastProcessor.preProcess(); - spReq.setSuccess(processResult.isSuccess()); - spReq.setMsg(processResult.getMsg()); - }catch (Exception e) { - log.warn("[ProcessorRunnable] broadcast task(jobId={}) preProcess failed.", request.getJobId(), e); - spReq.setSuccess(false); - spReq.setMsg(e.toString()); - } - - taskTrackerActor.tell(spReq, null); - } - } - - // 3. 通知 TaskTracker 任务开始运行 - reportStatus.setStatus(TaskStatus.PROCESSING.getValue()); - taskTrackerActor.tell(reportStatus, null); - - // 4. 完成提交前准备工作 - ProcessResult processResult; - TaskContext taskContext = new TaskContext(); - BeanUtils.copyProperties(request, taskContext); - taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent())); - - ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext); - - // 5. 正式提交运行 - ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); - BeanUtils.copyProperties(request, reportReq); try { - processResult = processor.process(taskContext); - reportReq.setResult(processResult.getMsg()); - if (processResult.isSuccess()) { - reportReq.setStatus(TaskStatus.PROCESS_SUCCESS.getValue()); - }else { + // 0. 创建回复 + ProcessorReportTaskStatusReq reportStatus = new ProcessorReportTaskStatusReq(); + BeanUtils.copyProperties(request, reportStatus); + + // 1. 获取 Processor + BasicProcessor processor = getProcessor(); + if (processor == null) { + reportStatus.setStatus(TaskStatus.PROCESS_FAILED.getValue()); + reportStatus.setResult("NO_PROCESSOR"); + taskTrackerActor.tell(reportStatus, null); + return; + } + + // 2. 根任务特殊处理 + ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType()); + if (TaskConstant.ROOT_TASK_ID.equals(request.getTaskId())) { + + // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task + if (executeType == ExecuteType.BROADCAST) { + + BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; + BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq(); + BeanUtils.copyProperties(request, reportStatus); + try { + ProcessResult processResult = broadcastProcessor.preProcess(); + spReq.setSuccess(processResult.isSuccess()); + spReq.setMsg(processResult.getMsg()); + }catch (Exception e) { + log.warn("[ProcessorRunnable] broadcast task(jobId={}) preProcess failed.", request.getJobId(), e); + spReq.setSuccess(false); + spReq.setMsg(e.toString()); + } + + taskTrackerActor.tell(spReq, null); + } + } + + // 3. 通知 TaskTracker 任务开始运行 + reportStatus.setStatus(TaskStatus.PROCESSING.getValue()); + taskTrackerActor.tell(reportStatus, null); + + // 4. 完成提交前准备工作 + ProcessResult processResult; + TaskContext taskContext = new TaskContext(); + BeanUtils.copyProperties(request, taskContext); + if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) { + taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent())); + } + + ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext); + + // 5. 正式提交运行 + ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); + BeanUtils.copyProperties(request, reportReq); + try { + processResult = processor.process(taskContext); + reportReq.setResult(processResult.getMsg()); + if (processResult.isSuccess()) { + reportReq.setStatus(TaskStatus.PROCESS_SUCCESS.getValue()); + }else { + reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue()); + } + }catch (Exception e) { + log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e); + + reportReq.setResult(e.toString()); reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue()); } + taskTrackerActor.tell(reportReq, null); }catch (Exception e) { - log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e); - - reportReq.setResult(e.toString()); - reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue()); + log.error("[ProcessorRunnable] execute failed, please fix this bug!", e); } - taskTrackerActor.tell(reportReq, null); } private BasicProcessor getProcessor() { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java index 00a5f3f9..b8d100f3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java @@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.persistence; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.sql.DataSource; import java.sql.Connection; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index 76e2cf77..50879edf 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -1,6 +1,5 @@ package com.github.kfcfans.oms.worker.persistence; -import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; diff --git a/oh-my-scheduler-worker/src/main/resources/oms-logback.xml b/oh-my-scheduler-worker/src/main/resources/oms-logback.xml index 16c12a08..8c8e67b3 100644 --- a/oh-my-scheduler-worker/src/main/resources/oms-logback.xml +++ b/oh-my-scheduler-worker/src/main/resources/oms-logback.xml @@ -5,12 +5,15 @@ - %d [%t] %-5level %logger{36}.%M\(%file:%line\) - %msg%n + 222222%d [%t] %-5level %logger{36}.%M\(%file:%line\) - %msg%n UTF-8 + + + diff --git a/oh-my-scheduler-worker/src/test/resources/logback-test.xml b/oh-my-scheduler-worker/src/test/resources/logback-test.xml new file mode 100644 index 00000000..13002612 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/resources/logback-test.xml @@ -0,0 +1,26 @@ + + + + + + + + %red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %green([%thread]) - %cyan(%msg%n) + + UTF-8 + + + + + + + + + + + + + + + +