From ab642805c42815eeab8b17a1e1523f6e84c28a4b Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 25 Mar 2020 20:21:26 +0800 Subject: [PATCH] test the TaskTracker(Standalone mode) and fix a lot of children bug --- .../common/request/ServerScheduleJobReq.java | 6 +- .../TaskTrackerReportInstanceStatusReq.java | 4 +- .../common/request/WorkerHealthReportReq.java | 4 +- .../kfcfans/common/response/AskResponse.java | 4 +- .../oms/worker/actors/TaskTrackerActor.java | 26 +++++++- .../worker/common/constants/TaskStatus.java | 32 ++++++---- .../core/executor/ProcessorRunnable.java | 6 +- .../worker/core/tracker/task/TaskTracker.java | 48 ++++++++------ .../core/tracker/task/TaskTrackerPool.java | 5 ++ .../oms/worker/persistence/TaskDAO.java | 5 -- .../oms/worker/persistence/TaskDAOImpl.java | 10 +-- .../persistence/TaskPersistenceService.java | 5 +- .../BroadcastTaskPreExecuteFinishedReq.java | 4 +- .../pojo/request/ProcessorMapTaskRequest.java | 3 +- .../request/ProcessorReportTaskStatusReq.java | 4 +- .../request/TaskTrackerStopInstanceReq.java | 4 +- .../github/kfcfans/oms/TaskTrackerTest.java | 64 +++++++++++++++++++ 17 files changed, 173 insertions(+), 61 deletions(-) create mode 100644 oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java index 812a3ab8..63e3e665 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java @@ -2,6 +2,8 @@ package com.github.kfcfans.common.request; import lombok.Data; +import java.io.Serializable; + /** * 服务端调度任务请求(一次任务处理的入口) * @@ -9,7 +11,7 @@ import lombok.Data; * @since 2020/3/17 */ @Data -public class ServerScheduleJobReq { +public class ServerScheduleJobReq implements Serializable { // 调度的服务器地址,默认通讯目标 private String serverAddress; @@ -27,7 +29,7 @@ public class ServerScheduleJobReq { // 任务执行时间限制,单位毫秒 private long timeLimit; // 可用处理器地址,可能多值,逗号分隔 - private String workerAddress; + private String allWorkerAddress; private String jobParams; private String instanceParams; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java index e7f85aba..06187179 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java @@ -2,6 +2,8 @@ package com.github.kfcfans.common.request; import lombok.Data; +import java.io.Serializable; + /** * TaskTracker 将状态上报给服务器 * @@ -9,7 +11,7 @@ import lombok.Data; * @since 2020/3/17 */ @Data -public class TaskTrackerReportInstanceStatusReq { +public class TaskTrackerReportInstanceStatusReq implements Serializable { private String jobId; private String instanceId; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerHealthReportReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerHealthReportReq.java index 9ad974c2..d0223f62 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerHealthReportReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerHealthReportReq.java @@ -3,6 +3,8 @@ package com.github.kfcfans.common.request; import com.github.kfcfans.common.model.SystemMetrics; import lombok.Data; +import java.io.Serializable; + /** * Worker 上报健康信息(worker定时发送的heartbeat) * @@ -10,7 +12,7 @@ import lombok.Data; * @since 2020/3/25 */ @Data -public class WorkerHealthReportReq { +public class WorkerHealthReportReq implements Serializable { // 本机地址 -> IP:port private String totalAddress; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java index b9326138..474dd3a2 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java @@ -4,6 +4,8 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import java.io.Serializable; + /** * Pattens.ask 的响应 * @@ -13,6 +15,6 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor -public class AskResponse { +public class AskResponse implements Serializable { private boolean success; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index 1e0dc724..76490786 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -7,12 +7,14 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.oms.worker.persistence.TaskDO; +import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.common.response.AskResponse; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; import java.util.List; @@ -45,7 +47,11 @@ public class TaskTrackerActor extends AbstractActor { if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); } else { - taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult(), false); + + // 状态转化 + TaskStatus status = TaskStatus.convertStatus(TaskStatus.of(req.getStatus())); + + taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status.getValue(), req.getResult(), false); } } @@ -120,7 +126,21 @@ public class TaskTrackerActor extends AbstractActor { * 服务器任务调度处理器 */ private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { - // 接受到任务,创建 TaskTracker - } + String instanceId = req.getInstanceId(); + TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(instanceId); + if (taskTracker != null) { + log.warn("[TaskTrackerActor] TaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId); + return; + } + + // 原子创建,防止多实例的存在 + TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> { + + JobInstanceInfo jobInstanceInfo = new JobInstanceInfo(); + BeanUtils.copyProperties(req, jobInstanceInfo); + + return new TaskTracker(jobInstanceInfo); + }); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java index 8808a49c..ea4555da 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java @@ -16,9 +16,10 @@ public enum TaskStatus { /* ******************* TaskTracker 专用 ******************* */ WAITING_DISPATCH(1, "等待调度器调度"), DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功(但不保证worker收到)"), - WORKER_PROCESSING(3, "worker开始执行"), - WORKER_PROCESS_FAILED(4, "worker执行失败"), - WORKER_PROCESS_SUCCESS(5, "worker执行成功"), + WORKER_RECEIVED(3, "worker接收成功,但未开始执行"), + WORKER_PROCESSING(4, "worker正在执行"), + WORKER_PROCESS_FAILED(5, "worker执行失败"), + WORKER_PROCESS_SUCCESS(6, "worker执行成功"), /* ******************* Worker 专用 ******************* */ RECEIVE_SUCCESS(11, "成功接受任务但未开始执行(此时worker满载,暂时无法运行)"), @@ -30,18 +31,21 @@ public enum TaskStatus { private String des; public static TaskStatus of(int v) { - switch (v) { - case 1: return WAITING_DISPATCH; - case 2: return DISPATCH_SUCCESS_WORKER_UNCHECK; - case 3: return WORKER_PROCESSING; - case 4: return WORKER_PROCESS_FAILED; - case 5: return WORKER_PROCESS_SUCCESS; - - case 11: return RECEIVE_SUCCESS; - case 12: return PROCESSING; - case 13: return PROCESS_FAILED; - case 14: return PROCESS_SUCCESS; + for (TaskStatus taskStatus : values()) { + if (v == taskStatus.value) { + return taskStatus; + } } throw new IllegalArgumentException("no TaskStatus match the value of " + v); } + + public static TaskStatus convertStatus(TaskStatus processorStatus) { + switch (processorStatus) { + case RECEIVE_SUCCESS: return WORKER_RECEIVED; + case PROCESSING: return WORKER_PROCESSING; + case PROCESS_FAILED: return WORKER_PROCESS_FAILED; + case PROCESS_SUCCESS: return WORKER_PROCESS_SUCCESS; + } + throw new IllegalArgumentException(processorStatus.name() + " is not the processor status."); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index e0b71ca0..93ffa47c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -58,7 +58,7 @@ public class ProcessorRunnable implements Runnable { } ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext); - reportStatus(TaskStatus.WORKER_PROCESSING, null); + reportStatus(TaskStatus.PROCESSING, null); // 1. 获取 Processor BasicProcessor processor = getProcessor(); @@ -123,7 +123,7 @@ public class ProcessorRunnable implements Runnable { log.warn("[ProcessorRunnable] execute last task(instanceId={},taskId={}) failed.", instanceId, taskId, e); } - TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED; + TaskStatus status = lastResult.isSuccess() ? TaskStatus.PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED; reportStatus(status, lastResult.getMsg()); log.info("[ProcessorRunnable] instance(instanceId={},taskId={})' last task execute successfully, using time: {}", instanceId, taskId, stopwatch); @@ -139,7 +139,7 @@ public class ProcessorRunnable implements Runnable { log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e); processResult = new ProcessResult(false, e.toString()); } - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED, processResult.getMsg()); + reportStatus(processResult.isSuccess() ? TaskStatus.PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED, processResult.getMsg()); }catch (Exception e) { log.error("[ProcessorRunnable] execute failed, please fix this bug!", e); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 85dc1377..265b8b67 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -1,16 +1,13 @@ package com.github.kfcfans.oms.worker.core.tracker.task; -import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.pattern.Patterns; -import ch.qos.logback.core.util.SystemInfo; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.JobInstanceStatus; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.common.AkkaConstant; -import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.oms.worker.common.constants.CommonSJ; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; @@ -24,6 +21,7 @@ import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -42,14 +40,14 @@ import java.util.concurrent.atomic.AtomicBoolean; * @since 2020/3/17 */ @Slf4j -public abstract class TaskTracker { +@ToString +public class TaskTracker { protected long startTime; protected long jobTimeLimitMS; // 任务实例信息 protected JobInstanceInfo jobInstanceInfo; - protected ActorRef taskTrackerActorRef; @Getter protected List allWorkerAddress; @@ -59,13 +57,14 @@ public abstract class TaskTracker { protected AtomicBoolean finished = new AtomicBoolean(false); - public TaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) { + public TaskTracker(JobInstanceInfo jobInstanceInfo) { + + log.info("[TaskTracker] start to create TaskTracker for instance({}).", jobInstanceInfo); this.startTime = System.currentTimeMillis(); this.jobTimeLimitMS = jobInstanceInfo.getTimeLimit(); this.jobInstanceInfo = jobInstanceInfo; - this.taskTrackerActorRef = taskTrackerActorRef; this.taskPersistenceService = TaskPersistenceService.INSTANCE; ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build(); @@ -81,6 +80,7 @@ public abstract class TaskTracker { // 定时任务2:状态检查 scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); + } @@ -165,11 +165,12 @@ public abstract class TaskTracker { rootTask.setAddress(NetUtils.getLocalHost()); rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); rootTask.setCreatedTime(System.currentTimeMillis()); - rootTask.setCreatedTime(System.currentTimeMillis()); + rootTask.setLastModifiedTime(System.currentTimeMillis()); if (!taskPersistenceService.save(rootTask)) { throw new RuntimeException("create root task failed."); } + log.info("[TaskTracker] create root task successfully for instance(instanceId={}).", jobInstanceInfo.getInstanceId()); } @@ -199,10 +200,12 @@ public abstract class TaskTracker { ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath); // 发送请求(Akka的tell是至少投递一次,经实验表明无法投递消息也不会报错...印度啊...) - targetActor.tell(req, taskTrackerActorRef); + targetActor.tell(req, null); // 更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, null); + + log.debug("[TaskTracker] dispatch task({instanceId={},taskId={},taskName={}} successfully.)", task.getInstanceId(), task.getTaskId(), task.getTaskName()); }catch (Exception e) { // 调度失败,不修改数据库,下次重新随机派发给 remote actor log.warn("[TaskTracker] dispatch task({}) failed.", task); @@ -218,24 +221,25 @@ public abstract class TaskTracker { private static final long TIME_OUT_MS = 5000; - @Override - public void run() { + + private void innerRun() { final String instanceId = jobInstanceInfo.getInstanceId(); // 1. 查询统计信息 Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId); - long waitingDispatchNum = status2Num.get(TaskStatus.WAITING_DISPATCH); - long workerUnreceivedNum = status2Num.get(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK); - long receivedNum = status2Num.get(TaskStatus.RECEIVE_SUCCESS); - long succeedNum = status2Num.get(TaskStatus.WORKER_PROCESS_SUCCESS); - long failedNum = status2Num.get(TaskStatus.WORKER_PROCESS_FAILED); + long waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L); + long workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L); + long receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L); + long runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L); + long succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L); + long failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L); long finishedNum = succeedNum + failedNum; - long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum; + long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum; - log.debug("[TaskTracker] status check result({})", status2Num); + log.debug("[TaskTracker] status check result: {}", status2Num); TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); req.setJobId(jobInstanceInfo.getJobId()); @@ -348,5 +352,13 @@ public abstract class TaskTracker { } + @Override + public void run() { + try { + innerRun(); + }catch (Exception e) { + log.warn("[TaskTracker] status checker execute failed, please fix the bug (@tjq)!", e); + } + } } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java index 6fd4f37b..e5296da6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.worker.core.tracker.task; import com.google.common.collect.Maps; import java.util.Map; +import java.util.function.Function; /** * 持有 Processor 对象 @@ -25,4 +26,8 @@ public class TaskTrackerPool { instanceId2TaskTracker.remove(instanceId); } + public static void atomicCreateTaskTracker(String instanceId, Function creator) { + instanceId2TaskTracker.computeIfAbsent(instanceId, creator); + } + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java index 921cbad3..1fa22481 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java @@ -23,11 +23,6 @@ public interface TaskDAO { boolean save(TaskDO task); boolean batchSave(Collection tasks); - /** - * 更新任务数据,必须有主键 instanceId + taskId - */ - boolean update(TaskDO task); - int batchDelete(String instanceId, List taskIds); List simpleQuery(SimpleTaskQuery query); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index 121af1a7..63f55489 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -36,7 +36,7 @@ public class TaskDAOImpl implements TaskDAO { String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)"; try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { fillInsertPreparedStatement(task, ps); - return ps.execute(); + return ps.executeUpdate() == 1; }catch (Exception e) { log.error("[TaskDAO] insert failed.", e); } @@ -64,11 +64,6 @@ public class TaskDAOImpl implements TaskDAO { } - @Override - public boolean update(TaskDO task) { - return false; - } - @Override public int batchDelete(String instanceId, List taskIds) { String deleteSQL = "delete from task_info where instance_id = %s and task_id in %s"; @@ -144,7 +139,8 @@ public class TaskDAOImpl implements TaskDAO { String sqlFormat = "update task_info set %s where %s"; String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition()); try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { - return stat.execute(); + stat.executeUpdate(); + return true; }catch (Exception e) { log.error("[TaskDAO] simpleUpdate failed(sql = {}).", updateField, e); return false; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index f1093d14..873dc855 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -97,8 +97,9 @@ public class TaskPersistenceService { Map result = Maps.newHashMap(); dbRES.forEach(row -> { - int status = Integer.parseInt(String.valueOf(row.get("status"))); - long num = Long.parseLong(String.valueOf(row.get("num"))); + // H2 数据库都是大写... + int status = Integer.parseInt(String.valueOf(row.get("STATUS"))); + long num = Long.parseLong(String.valueOf(row.get("NUM"))); result.put(TaskStatus.of(status), num); }); return result; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java index 3350aff6..935338ea 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java @@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.pojo.request; import lombok.Data; +import java.io.Serializable; + /** * 广播任务 preExecute 结束信息 * @@ -9,7 +11,7 @@ import lombok.Data; * @since 2020/3/23 */ @Data -public class BroadcastTaskPreExecuteFinishedReq { +public class BroadcastTaskPreExecuteFinishedReq implements Serializable { private String instanceId; private String taskId; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java index 4bd30338..d3c684bb 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java @@ -8,6 +8,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; +import java.io.Serializable; import java.util.List; /** @@ -18,7 +19,7 @@ import java.util.List; */ @Getter @NoArgsConstructor -public class ProcessorMapTaskRequest { +public class ProcessorMapTaskRequest implements Serializable { private String instanceId; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorReportTaskStatusReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorReportTaskStatusReq.java index 390bb5b6..9519d9a8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorReportTaskStatusReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorReportTaskStatusReq.java @@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.pojo.request; import lombok.Data; +import java.io.Serializable; + /** * worker 上报 task 执行情况 * @@ -9,7 +11,7 @@ import lombok.Data; * @since 2020/3/17 */ @Data -public class ProcessorReportTaskStatusReq { +public class ProcessorReportTaskStatusReq implements Serializable { private String instanceId; private String taskId; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStopInstanceReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStopInstanceReq.java index f1e3b5e5..56e0fde3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStopInstanceReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStopInstanceReq.java @@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.pojo.request; import lombok.Data; +import java.io.Serializable; + /** * TaskTracker 停止 ProcessorTracker,释放相关资源 * 任务执行完毕后停止 OR 手动强制停止 @@ -10,7 +12,7 @@ import lombok.Data; * @since 2020/3/25 */ @Data -public class TaskTrackerStopInstanceReq { +public class TaskTrackerStopInstanceReq implements Serializable { private String instanceId; // 保留字段,暂时没用 diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java new file mode 100644 index 00000000..c8f71242 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java @@ -0,0 +1,64 @@ +package com.github.kfcfans.oms; + +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.common.ProcessorType; +import com.github.kfcfans.common.request.ServerScheduleJobReq; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.OhMyConfig; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; +import com.github.kfcfans.oms.worker.common.utils.NetUtils; +import com.typesafe.config.ConfigFactory; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * 测试完整的 JobInstance 执行流程 + * + * @author tjq + * @since 2020/3/25 + */ +public class TaskTrackerTest { + + private static ActorSelection remoteTaskTracker; + + @BeforeAll + public static void init() { + + OhMyConfig ohMyConfig = new OhMyConfig(); + ohMyConfig.setAppName("oms-test"); + OhMyWorker worker = new OhMyWorker(); + worker.setConfig(ohMyConfig); + worker.init(); + + ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), AkkaConstant.Task_TRACKER_ACTOR_NAME); + remoteTaskTracker = testAS.actorSelection(akkaRemotePath); + } + + @Test + public void testStandaloneJob() throws Exception { + + ServerScheduleJobReq req = new ServerScheduleJobReq(); + + req.setJobId("1"); + req.setInstanceId("10086"); + req.setAllWorkerAddress(NetUtils.getLocalHost()); + req.setExecuteType(ExecuteType.STANDALONE.name()); + req.setJobParams("this is job Params"); + req.setInstanceParams("this is instance Params"); + req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); + req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor"); + req.setTaskRetryNum(3); + req.setThreadConcurrency(5); + req.setTimeLimit(500000); + + remoteTaskTracker.tell(req, null); + + Thread.sleep(500000); + } + + +}