diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
index 1261d304..58a0b865 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java
@@ -40,73 +40,81 @@ public class ProcessorRunnable implements Runnable {
@Override
public void run() {
- // 0. 创建回复
- ProcessorReportTaskStatusReq reportStatus = new ProcessorReportTaskStatusReq();
- BeanUtils.copyProperties(request, reportStatus);
+ log.debug("[ProcessorRunnable] start to run task(instanceId={}&taskId={}&taskName={})", request.getInstanceId(), request.getTaskId(), request.getTaskName());
- // 1. 获取 Processor
- BasicProcessor processor = getProcessor();
- if (processor == null) {
- reportStatus.setStatus(TaskStatus.PROCESS_FAILED.getValue());
- reportStatus.setResult("NO_PROCESSOR");
- taskTrackerActor.tell(reportStatus, null);
- return;
- }
-
- // 2. 根任务特殊处理
- ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType());
- if (TaskConstant.ROOT_TASK_ID.equals(request.getTaskId())) {
-
- // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task
- if (executeType == ExecuteType.BROADCAST) {
-
- BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
- BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
- BeanUtils.copyProperties(request, reportStatus);
- try {
- ProcessResult processResult = broadcastProcessor.preProcess();
- spReq.setSuccess(processResult.isSuccess());
- spReq.setMsg(processResult.getMsg());
- }catch (Exception e) {
- log.warn("[ProcessorRunnable] broadcast task(jobId={}) preProcess failed.", request.getJobId(), e);
- spReq.setSuccess(false);
- spReq.setMsg(e.toString());
- }
-
- taskTrackerActor.tell(spReq, null);
- }
- }
-
- // 3. 通知 TaskTracker 任务开始运行
- reportStatus.setStatus(TaskStatus.PROCESSING.getValue());
- taskTrackerActor.tell(reportStatus, null);
-
- // 4. 完成提交前准备工作
- ProcessResult processResult;
- TaskContext taskContext = new TaskContext();
- BeanUtils.copyProperties(request, taskContext);
- taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
-
- ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
-
- // 5. 正式提交运行
- ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
- BeanUtils.copyProperties(request, reportReq);
try {
- processResult = processor.process(taskContext);
- reportReq.setResult(processResult.getMsg());
- if (processResult.isSuccess()) {
- reportReq.setStatus(TaskStatus.PROCESS_SUCCESS.getValue());
- }else {
+ // 0. 创建回复
+ ProcessorReportTaskStatusReq reportStatus = new ProcessorReportTaskStatusReq();
+ BeanUtils.copyProperties(request, reportStatus);
+
+ // 1. 获取 Processor
+ BasicProcessor processor = getProcessor();
+ if (processor == null) {
+ reportStatus.setStatus(TaskStatus.PROCESS_FAILED.getValue());
+ reportStatus.setResult("NO_PROCESSOR");
+ taskTrackerActor.tell(reportStatus, null);
+ return;
+ }
+
+ // 2. 根任务特殊处理
+ ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType());
+ if (TaskConstant.ROOT_TASK_ID.equals(request.getTaskId())) {
+
+ // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task
+ if (executeType == ExecuteType.BROADCAST) {
+
+ BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
+ BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
+ BeanUtils.copyProperties(request, reportStatus);
+ try {
+ ProcessResult processResult = broadcastProcessor.preProcess();
+ spReq.setSuccess(processResult.isSuccess());
+ spReq.setMsg(processResult.getMsg());
+ }catch (Exception e) {
+ log.warn("[ProcessorRunnable] broadcast task(jobId={}) preProcess failed.", request.getJobId(), e);
+ spReq.setSuccess(false);
+ spReq.setMsg(e.toString());
+ }
+
+ taskTrackerActor.tell(spReq, null);
+ }
+ }
+
+ // 3. 通知 TaskTracker 任务开始运行
+ reportStatus.setStatus(TaskStatus.PROCESSING.getValue());
+ taskTrackerActor.tell(reportStatus, null);
+
+ // 4. 完成提交前准备工作
+ ProcessResult processResult;
+ TaskContext taskContext = new TaskContext();
+ BeanUtils.copyProperties(request, taskContext);
+ if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) {
+ taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
+ }
+
+ ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
+
+ // 5. 正式提交运行
+ ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
+ BeanUtils.copyProperties(request, reportReq);
+ try {
+ processResult = processor.process(taskContext);
+ reportReq.setResult(processResult.getMsg());
+ if (processResult.isSuccess()) {
+ reportReq.setStatus(TaskStatus.PROCESS_SUCCESS.getValue());
+ }else {
+ reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
+ }
+ }catch (Exception e) {
+ log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e);
+
+ reportReq.setResult(e.toString());
reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
}
+ taskTrackerActor.tell(reportReq, null);
}catch (Exception e) {
- log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e);
-
- reportReq.setResult(e.toString());
- reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
+ log.error("[ProcessorRunnable] execute failed, please fix this bug!", e);
}
- taskTrackerActor.tell(reportReq, null);
}
private BasicProcessor getProcessor() {
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java
index 00a5f3f9..b8d100f3 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java
@@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.persistence;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.Connection;
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java
index 76e2cf77..50879edf 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java
@@ -1,6 +1,5 @@
package com.github.kfcfans.oms.worker.persistence;
-import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
diff --git a/oh-my-scheduler-worker/src/main/resources/oms-logback.xml b/oh-my-scheduler-worker/src/main/resources/oms-logback.xml
index 16c12a08..8c8e67b3 100644
--- a/oh-my-scheduler-worker/src/main/resources/oms-logback.xml
+++ b/oh-my-scheduler-worker/src/main/resources/oms-logback.xml
@@ -5,12 +5,15 @@
- %d [%t] %-5level %logger{36}.%M\(%file:%line\) - %msg%n
+ 222222%d [%t] %-5level %logger{36}.%M\(%file:%line\) - %msg%n
UTF-8
+
+
+
diff --git a/oh-my-scheduler-worker/src/test/resources/logback-test.xml b/oh-my-scheduler-worker/src/test/resources/logback-test.xml
new file mode 100644
index 00000000..13002612
--- /dev/null
+++ b/oh-my-scheduler-worker/src/test/resources/logback-test.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
+ %red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %green([%thread]) - %cyan(%msg%n)
+
+ UTF-8
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+