diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java index 63e3e665..86b83140 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java @@ -15,29 +15,49 @@ public class ServerScheduleJobReq implements Serializable { // 调度的服务器地址,默认通讯目标 private String serverAddress; + // 可用处理器地址,可能多值,逗号分隔 + private String allWorkerAddress; /* *********************** 任务相关属性 *********************** */ + /** + * 基础信息 + */ private String jobId; private String instanceId; + + /** + * 任务执行处理器信息 + */ // 任务执行类型,单机、广播、MR private String executeType; // 处理器类型(JavaBean、Jar、脚本等) private String processorType; // 处理器信息 private String processorInfo; - // 任务执行时间限制,单位毫秒 - private long timeLimit; - // 可用处理器地址,可能多值,逗号分隔 - private String allWorkerAddress; + + /** + * 超时时间 + */ + // 整个任务的总体超时时间 + private long instanceTimeoutMS; + // Task的超时时间 + private long taskTimeoutMS; + + /** + * 任务运行参数 + */ + // 任务级别的参数,相当于类的static变量 private String jobParams; + // 实例级别的参数,相当于类的普通变量 private String instanceParams; - /* *********************** Map/MapReduce 任务专用 *********************** */ - // 每台机器的处理线程数上限 private int threadConcurrency; // 子任务重试次数(任务本身的重试机制由server控制) private int taskRetryNum; + + + } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java index 338ca4a6..845329f2 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.common.utils; import lombok.extern.slf4j.Slf4j; +import java.util.Collection; import java.util.function.Supplier; @@ -61,4 +62,17 @@ public class CommonUtils { } return booleanExecutor.get(); } + + /** + * 生成数据库查询语句 in 后的条件 + * ["a", "b", "c"] -> ('a','b','c') + */ + public static String getInStringCondition(Collection collection) { + if (collection == null || collection.isEmpty()) { + return "()"; + } + StringBuilder sb = new StringBuilder(" ( "); + collection.forEach(str -> sb.append("'").append(str).append("',")); + return sb.replace(sb.length() -1, sb.length(), " ) ").toString(); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java index e6fa28d8..ed334bdf 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.worker.actors; import akka.actor.AbstractActor; import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker; import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTrackerPool; +import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; import lombok.extern.slf4j.Slf4j; @@ -27,13 +28,21 @@ public class ProcessorTrackerActor extends AbstractActor { * 处理来自TaskTracker的task执行请求 */ private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { - String jobId = req.getJobId(); - String instanceId = req.getInstanceId(); + String jobId = req.getInstanceInfo().getJobId(); + String instanceId = req.getInstanceInfo().getInstanceId(); ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> { ProcessorTracker pt = new ProcessorTracker(req); log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId); return pt; }); - processorTracker.submitTask(req); + + TaskDO task = new TaskDO(); + + task.setTaskId(req.getTaskId()); + task.setTaskName(req.getTaskName()); + task.setTaskContent(req.getTaskContent()); + task.setFailedCnt(req.getTaskCurrentRetryNums()); + + processorTracker.submitTask(task); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index 8ab946af..b86117e3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -7,11 +7,12 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.oms.worker.persistence.TaskDO; -import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; +import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.common.response.AskResponse; +import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -33,6 +34,7 @@ public class TaskTrackerActor extends AbstractActor { .match(ProcessorReportTaskStatusReq.class, this::onReceiveProcessorReportTaskStatusReq) .match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq) .match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest) + .match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq) .matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj)) .build(); } @@ -48,10 +50,7 @@ public class TaskTrackerActor extends AbstractActor { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); } else { - // 状态转化 - TaskStatus status = TaskStatus.convertStatus(TaskStatus.of(req.getStatus())); - - taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status.getValue(), req.getResult(), false); + taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult(), false); } } @@ -102,6 +101,7 @@ public class TaskTrackerActor extends AbstractActor { log.info("[TaskTrackerActor] instance(id={}) pre process finished.", req.getInstanceId()); + // TODO:考虑放到 BroadcastTaskTracker 中去 // 1. 生成集群子任务 boolean success = req.isSuccess(); if (success) { @@ -137,12 +137,18 @@ public class TaskTrackerActor extends AbstractActor { } // 原子创建,防止多实例的存在 - TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> { + TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> new TaskTracker(req)); + } - JobInstanceInfo jobInstanceInfo = new JobInstanceInfo(); - BeanUtils.copyProperties(req, jobInstanceInfo); - - return new TaskTracker(jobInstanceInfo); - }); + /** + * ProcessorTracker 心跳处理器 + */ + private void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) { + TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); + if (taskTracker == null) { + log.warn("[TaskTrackerActor] receive ProcessorTrackerStatusReportReq({}) but system can't find TaskTracker.", req); + return; + } + taskTracker.receiveProcessorTrackerHeartbeat(req); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java index 241ea0de..02c4c29a 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java @@ -1,6 +1,6 @@ package com.github.kfcfans.oms.worker.common; -import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.github.kfcfans.oms.worker.persistence.TaskDO; import java.util.concurrent.atomic.AtomicLong; @@ -13,7 +13,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ThreadLocalStore { - public static final ThreadLocal TASK_CONTEXT_THREAD_LOCAL = new ThreadLocal<>(); + public static final ThreadLocal TASK_THREAD_LOCAL = new ThreadLocal<>(); public static final ThreadLocal TASK_ID_THREAD_LOCAL = new ThreadLocal<>(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java index ea4555da..be8ac5a8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java @@ -13,19 +13,12 @@ import lombok.Getter; @AllArgsConstructor public enum TaskStatus { - /* ******************* TaskTracker 专用 ******************* */ WAITING_DISPATCH(1, "等待调度器调度"), DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功(但不保证worker收到)"), WORKER_RECEIVED(3, "worker接收成功,但未开始执行"), WORKER_PROCESSING(4, "worker正在执行"), WORKER_PROCESS_FAILED(5, "worker执行失败"), - WORKER_PROCESS_SUCCESS(6, "worker执行成功"), - - /* ******************* Worker 专用 ******************* */ - RECEIVE_SUCCESS(11, "成功接受任务但未开始执行(此时worker满载,暂时无法运行)"), - PROCESSING(12, "执行中"), - PROCESS_FAILED(13, "执行失败"), - PROCESS_SUCCESS(14, "执行成功"); + WORKER_PROCESS_SUCCESS(6, "worker执行成功"); private int value; private String des; @@ -38,14 +31,4 @@ public enum TaskStatus { } throw new IllegalArgumentException("no TaskStatus match the value of " + v); } - - public static TaskStatus convertStatus(TaskStatus processorStatus) { - switch (processorStatus) { - case RECEIVE_SUCCESS: return WORKER_RECEIVED; - case PROCESSING: return WORKER_PROCESSING; - case PROCESS_FAILED: return WORKER_PROCESS_FAILED; - case PROCESS_SUCCESS: return WORKER_PROCESS_SUCCESS; - } - throw new IllegalArgumentException(processorStatus.name() + " is not the processor status."); - } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 20b8348b..066478d0 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -9,9 +9,10 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.common.utils.SerializerUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory; +import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; +import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; -import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.oms.worker.sdk.ProcessResult; import com.github.kfcfans.oms.worker.sdk.TaskContext; @@ -20,7 +21,6 @@ import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor; import com.google.common.base.Stopwatch; import lombok.AllArgsConstructor; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -37,40 +37,44 @@ import java.util.concurrent.atomic.AtomicLong; @AllArgsConstructor public class ProcessorRunnable implements Runnable { - private final ActorSelection taskTrackerActor; - @Getter - private final TaskTrackerStartTaskReq request; + private InstanceInfo instanceInfo; + private final ActorSelection taskTrackerActor; + private final TaskDO task; @Override public void run() { - String taskId = request.getTaskId(); - String instanceId = request.getInstanceId(); + String taskId = task.getTaskId(); + String instanceId = task.getInstanceId(); - log.debug("[ProcessorRunnable] start to run task(instanceId={}&taskId={}&taskName={})", instanceId, taskId, request.getTaskName()); + log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); try { // 0. 完成执行上下文准备 & 上报执行信息 TaskContext taskContext = new TaskContext(); - BeanUtils.copyProperties(request, taskContext); - if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) { - taskContext.setSubTask(SerializerUtils.deSerialized(request.getSubTaskContent())); + BeanUtils.copyProperties(task, taskContext); + taskContext.setMaxRetryTimes(instanceInfo.getTaskRetryNum()); + taskContext.setCurrentRetryTimes(task.getFailedCnt()); + taskContext.setJobParams(instanceInfo.getJobParams()); + taskContext.setInstanceParams(instanceInfo.getInstanceParams()); + if (task.getTaskContent() != null && task.getTaskContent().length > 0) { + taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent())); } - ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext); + ThreadLocalStore.TASK_THREAD_LOCAL.set(task); ThreadLocalStore.TASK_ID_THREAD_LOCAL.set(new AtomicLong(0)); - reportStatus(TaskStatus.PROCESSING, null); + reportStatus(TaskStatus.WORKER_PROCESSING, null); // 1. 获取 Processor BasicProcessor processor = getProcessor(); if (processor == null) { - reportStatus(TaskStatus.PROCESS_FAILED, "NO_PROCESSOR"); + reportStatus(TaskStatus.WORKER_PROCESS_FAILED, "NO_PROCESSOR"); return; } // 2. 根任务特殊处理 - ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType()); + ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); if (TaskConstant.ROOT_TASK_ID.equals(taskId)) { // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task @@ -86,7 +90,7 @@ public class ProcessorRunnable implements Runnable { spReq.setSuccess(processResult.isSuccess()); spReq.setMsg(processResult.getMsg()); }catch (Exception e) { - log.warn("[ProcessorRunnable] broadcast task(instanceId={}) preProcess failed.", instanceId, e); + log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); spReq.setSuccess(false); spReq.setMsg(e.toString()); } @@ -102,7 +106,7 @@ public class ProcessorRunnable implements Runnable { if (TaskConstant.LAST_TASK_ID.equals(taskId)) { Stopwatch stopwatch = Stopwatch.createStarted(); - log.info("[ProcessorRunnable] instance(instanceId={})' last task(taskId={}) start to process.", instanceId, taskId); + log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); ProcessResult lastResult; Map taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId); @@ -124,13 +128,13 @@ public class ProcessorRunnable implements Runnable { } }catch (Exception e) { lastResult = new ProcessResult(false, e.toString()); - log.warn("[ProcessorRunnable] execute last task(instanceId={},taskId={}) failed.", instanceId, taskId, e); + log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e); } - TaskStatus status = lastResult.isSuccess() ? TaskStatus.PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED; + TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; reportStatus(status, lastResult.getMsg()); - log.info("[ProcessorRunnable] instance(instanceId={},taskId={})' last task execute successfully, using time: {}", instanceId, taskId, stopwatch); + log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); return; } @@ -140,21 +144,20 @@ public class ProcessorRunnable implements Runnable { try { processResult = processor.process(taskContext); }catch (Exception e) { - log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e); + log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e); processResult = new ProcessResult(false, e.toString()); } - reportStatus(processResult.isSuccess() ? TaskStatus.PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED, processResult.getMsg()); + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, processResult.getMsg()); }catch (Exception e) { - log.error("[ProcessorRunnable] execute failed, please fix this bug!", e); + log.error("[ProcessorRunnable-{}] execute failed, please fix this bug!", instanceId, e); } } private BasicProcessor getProcessor() { BasicProcessor processor = null; - ProcessorType processorType = ProcessorType.valueOf(request.getProcessorType()); - - String processorInfo = request.getProcessorInfo(); + ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); + String processorInfo = instanceInfo.getProcessorInfo(); switch (processorType) { case EMBEDDED_JAVA: @@ -163,7 +166,7 @@ public class ProcessorRunnable implements Runnable { try { processor = SpringUtils.getBean(processorInfo); }catch (Exception e) { - log.warn("[ProcessorRunnable] no spring bean of processor(className={}).", processorInfo); + log.warn("[ProcessorRunnable-{}] no spring bean of processor(className={}).", instanceInfo, processorInfo); } } // 反射加载 @@ -181,8 +184,8 @@ public class ProcessorRunnable implements Runnable { private void reportStatus(TaskStatus status, String result) { ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq(); - req.setInstanceId(request.getInstanceId()); - req.setTaskId(request.getTaskId()); + req.setInstanceId(task.getInstanceId()); + req.setTaskId(task.getTaskId()); req.setStatus(status.getValue()); req.setResult(result); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 7fdb7f26..f1837a5f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -7,16 +7,14 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable; import com.github.kfcfans.oms.worker.persistence.TaskDO; -import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; +import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; +import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; -import org.springframework.util.CollectionUtils; -import java.util.List; import java.util.concurrent.*; /** @@ -30,23 +28,15 @@ public class ProcessorTracker { // 记录创建时间 private long startTime; - private long jobTimeLimitMS; - - // 记录该 Job 相关信息 + // 任务实例信息 + private InstanceInfo instanceInfo; + // 冗余 instanceId,方便日志 private String instanceId; - private String executeType; - private String processorType; - private String processorInfo; - private int threadConcurrency; - private String jobParams; - private String instanceParams; - private int maxRetryTimes; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; private ThreadPoolExecutor threadPool; - private static final int MAX_QUEUE_SIZE = 20; /** * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T) @@ -55,17 +45,9 @@ public class ProcessorTracker { // 赋值 this.startTime = System.currentTimeMillis(); - this.jobTimeLimitMS = request.getJobTimeLimitMS(); - this.instanceId = request.getInstanceId(); - this.executeType = request.getExecuteType(); - this.processorType = request.getProcessorType(); - this.processorInfo = request.getProcessorInfo(); - this.threadConcurrency = request.getThreadConcurrency(); - this.jobParams = request.getJobParams(); - this.instanceParams = request.getInstanceParams(); - this.maxRetryTimes = request.getMaxRetryTimes(); + this.instanceInfo = request.getInstanceInfo(); + this.instanceId = request.getInstanceInfo().getInstanceId(); this.taskTrackerAddress = request.getTaskTrackerAddress(); - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, AkkaConstant.Task_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); @@ -75,49 +57,43 @@ public class ProcessorTracker { } /** - * 提交任务 + * 提交任务到线程池执行 + * 1.0版本:TaskTracker有任务就dispatch,导致 ProcessorTracker 本地可能堆积过多的任务,造成内存压力。为此 ProcessorTracker 在线程 + * 池队列堆积到一定程度时,会将数据持久化到DB,然后通过异步线程定时从数据库中取回任务,重新提交执行。 + * 联动:数据库的SPID设计、TaskStatus段落设计等,全部取消... + * last commitId: 341953aceceafec0fbe7c3d9a3e26451656b945e + * 2.0版本:ProcessorTracker定时向TaskTracker发送心跳消息,心跳消息中包含了当前线程池队列任务个数,TaskTracker根据ProcessorTracker + * 的状态判断能否继续派发任务。因此,ProcessorTracker本地不会堆积过多任务,故删除 持久化机制 ╥﹏╥...! + * @param newTask 需要提交到线程池执行的任务 */ - public void submitTask(TaskTrackerStartTaskReq newTaskReq) { + public void submitTask(TaskDO newTask) { - // 1. 回复接受成功 + // 1. 设置值并提交执行 + newTask.setJobId(instanceInfo.getJobId()); + newTask.setInstanceId(instanceInfo.getInstanceId()); + newTask.setAddress(taskTrackerAddress); + + ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask); + threadPool.submit(processorRunnable); + + // 2. 回复接收成功 ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); - BeanUtils.copyProperties(newTaskReq, reportReq); - reportReq.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue()); + reportReq.setInstanceId(instanceId); + reportReq.setTaskId(newTask.getTaskId()); + reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); + + reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); taskTrackerActorRef.tell(reportReq, null); - // 2.1 内存控制,持久化 - if (threadPool.getQueue().size() > MAX_QUEUE_SIZE) { - - TaskDO newTask = new TaskDO(); - BeanUtils.copyProperties(newTaskReq, newTask); - newTask.setTaskContent(newTaskReq.getSubTaskContent()); - newTask.setAddress(newTaskReq.getTaskTrackerAddress()); - newTask.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue()); - newTask.setFailedCnt(newTaskReq.getCurrentRetryTimes()); - newTask.setCreatedTime(System.currentTimeMillis()); - newTask.setLastModifiedTime(System.currentTimeMillis()); - // 特殊处理 instanceId,防止冲突 - newTask.setInstanceId(getSPInstanceId(instanceId)); - - boolean save = TaskPersistenceService.INSTANCE.save(newTask); - if (save) { - log.debug("[ProcessorTracker] persistent task({}) succeed.", newTask); - }else { - log.warn("[ProcessorTracker] persistent task({}) failed.", newTask); - } - return; - } - - // 2.2 提交执行 - ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, newTaskReq); - threadPool.submit(processorRunnable); + log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", + instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size()); } /** * 任务是否超时 */ public boolean isTimeout() { - return System.currentTimeMillis() - startTime > jobTimeLimitMS; + return System.currentTimeMillis() - startTime > instanceInfo.getInstanceTimeoutMS(); } @@ -129,7 +105,7 @@ public class ProcessorTracker { BlockingQueue queue = new LinkedBlockingQueue<>(); // 自定义线程池中线程名称 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build(); - threadPool = new ThreadPoolExecutor(threadConcurrency, threadConcurrency, 60L, TimeUnit.SECONDS, queue, threadFactory); + threadPool = new ThreadPoolExecutor(instanceInfo.getTaskRetryNum(), instanceInfo.getTaskRetryNum(), 60L, TimeUnit.SECONDS, queue, threadFactory); // 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0) threadPool.allowCoreThreadTimeOut(true); } @@ -141,69 +117,25 @@ public class ProcessorTracker { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build(); ScheduledExecutorService timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); - timingPool.scheduleWithFixedDelay(new PoolStatusCheckRunnable(), 60, 10, TimeUnit.SECONDS); + timingPool.scheduleAtFixedRate(new TimingStatusReportRunnable(), 0, 15, TimeUnit.SECONDS); } /** - * 定期检查线程池运行状态(内存中的任务数量不足,则即使从数据库中获取并提交执行) + * 定时向 TaskTracker 汇报(携带任务执行信息的心跳) */ - private class PoolStatusCheckRunnable implements Runnable { + private class TimingStatusReportRunnable implements Runnable { @Override public void run() { - int queueSize = threadPool.getQueue().size(); - if (queueSize >= MAX_QUEUE_SIZE / 2) { - return; - } + // 1. 查询数据库中等待执行的任务数量 + long waitingNum = threadPool.getQueue().size(); - TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE; - - // 查询时也用特殊处理的 instanceId 查即可 - List taskDOList =taskPersistenceService.getTaskByStatus(getSPInstanceId(instanceId), TaskStatus.RECEIVE_SUCCESS, MAX_QUEUE_SIZE / 2); - - if (CollectionUtils.isEmpty(taskDOList)) { - return; - } - - List deletedIds = Lists.newLinkedList(); - - log.debug("[ProcessorTracker] timing add task to thread pool."); - - // 提交到线程池执行 - taskDOList.forEach(task -> { - - // 还原 instanceId - task.setInstanceId(instanceId); - runTask(task); - deletedIds.add(task.getTaskId()); - }); - - // 删除任务(需要使用特殊instanceId) - taskPersistenceService.batchDelete(getSPInstanceId(instanceId), deletedIds); - } - - private void runTask(TaskDO task) { - TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(); - BeanUtils.copyProperties(task, req); - - req.setExecuteType(executeType); - req.setProcessorType(processorType); - req.setProcessorInfo(processorInfo); - req.setTaskTrackerAddress(taskTrackerAddress); - req.setJobParams(jobParams); - req.setInstanceParams(instanceParams); - req.setSubTaskContent(task.getTaskContent()); - req.setMaxRetryTimes(maxRetryTimes); - req.setCurrentRetryTimes(task.getFailedCnt()); - - ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, req); - threadPool.submit(processorRunnable); + // 2. 发送请求 + ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceInfo.getInstanceId(), waitingNum); + taskTrackerActorRef.tell(req, null); } } - private static String getSPInstanceId(String instanceId) { - return "L" + instanceId; - } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index d560edf3..c7f7262c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -4,6 +4,7 @@ import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.JobInstanceStatus; +import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.oms.worker.OhMyWorker; @@ -15,14 +16,19 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.NetUtils; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; -import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; +import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; +import com.github.kfcfans.oms.worker.pojo.model.ProcessorTrackerStatus; +import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq; +import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Getter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -33,6 +39,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新 @@ -44,44 +51,48 @@ import java.util.concurrent.atomic.AtomicBoolean; @ToString public class TaskTracker { - protected long startTime; - protected long jobTimeLimitMS; - + private long startTime; // 任务实例信息 - protected JobInstanceInfo jobInstanceInfo; + private InstanceInfo instanceInfo; @Getter - protected List allWorkerAddress; + private List allWorkerAddress; + private Map pTAddress2Status; - protected TaskPersistenceService taskPersistenceService; - protected ScheduledExecutorService scheduledPool; + // 数据库持久化服务 + private TaskPersistenceService taskPersistenceService; + // 定时任务线程池 + private ScheduledExecutorService scheduledPool; - protected AtomicBoolean finished = new AtomicBoolean(false); + private AtomicBoolean finished = new AtomicBoolean(false); - public TaskTracker(JobInstanceInfo jobInstanceInfo) { - - log.info("[TaskTracker] start to create TaskTracker for instance({}).", jobInstanceInfo); + public TaskTracker(ServerScheduleJobReq req) { this.startTime = System.currentTimeMillis(); - this.jobTimeLimitMS = jobInstanceInfo.getTimeLimit(); - this.jobInstanceInfo = jobInstanceInfo; + // 1. 初始化成员变量 + this.instanceInfo = new InstanceInfo(); + BeanUtils.copyProperties(req, instanceInfo); + allWorkerAddress = CommonSJ.commaSplitter.splitToList(req.getAllWorkerAddress()); + pTAddress2Status = Maps.newConcurrentMap(); + allWorkerAddress.forEach(ip -> { + ProcessorTrackerStatus pts = new ProcessorTrackerStatus(); + pts.init(ip); + pTAddress2Status.put(ip, pts); + }); this.taskPersistenceService = TaskPersistenceService.INSTANCE; ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build(); this.scheduledPool = Executors.newScheduledThreadPool(2, factory); - allWorkerAddress = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress()); - - // 持久化根任务 + // 2. 持久化根任务 persistenceRootTask(); - // 定时任务1:任务派发 + // 3. 启动定时任务(任务派发 & 状态检查) scheduledPool.scheduleWithFixedDelay(new DispatcherRunnable(), 0, 5, TimeUnit.SECONDS); - - // 定时任务2:状态检查 scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); + log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", req.getInstanceId(), req); } @@ -102,7 +113,7 @@ public class TaskTracker { Optional dbTaskStatusOpt = taskPersistenceService.getTaskStatus(instanceId, taskId); if (!dbTaskStatusOpt.isPresent()) { - log.warn("[TaskTracker] get task status failed when try to update new task status, current params is instanceId={},taskId={},newStatus={}.", + log.warn("[TaskTracker-{}] query TaskStatus from DB failed when try to update new TaskStatus(taskId={},newStatus={}).", instanceId, taskId, newStatus); } @@ -110,7 +121,7 @@ public class TaskTracker { if (dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH).getValue() > newStatus) { // 必存在,但不怎么写,Java会警告... TaskStatus dbTaskStatus = dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH); - log.warn("[TaskTracker] task(instanceId={},taskId={},dbStatus={},requestStatus={}) status conflict, taskTracker won't update the status.", + log.warn("[TaskTracker-{}] task(taskId={},dbStatus={},requestStatus={}) status conflict, TaskTracker won't update the status.", instanceId, taskId, dbTaskStatus, nTaskStatus); return; } @@ -119,11 +130,11 @@ public class TaskTracker { if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) { // 数据库查询失败的话,就只重试一次 - int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(jobInstanceInfo.getTaskRetryNum() - 1); - if (failedCnt < jobInstanceInfo.getTaskRetryNum()) { + int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(instanceInfo.getTaskRetryNum() - 1); + if (failedCnt < instanceInfo.getTaskRetryNum()) { boolean retryTask = taskPersistenceService.updateRetryTask(instanceId, taskId, failedCnt + 1); if (retryTask) { - log.info("[TaskTracker] task(instanceId={},taskId={}) will have a retry.", instanceId, taskId); + log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId); return; } } @@ -134,7 +145,7 @@ public class TaskTracker { } if (!updateResult) { - log.warn("[TaskTracker] update task status failed, this task(instanceId={}&taskId={}) may be processed repeatedly!", instanceId, taskId); + log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); } } @@ -150,18 +161,28 @@ public class TaskTracker { // 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!) newTaskList.forEach(task -> { - task.setJobId(jobInstanceInfo.getJobId()); - task.setInstanceId(jobInstanceInfo.getInstanceId()); + task.setJobId(instanceInfo.getJobId()); + task.setInstanceId(instanceInfo.getInstanceId()); task.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); task.setFailedCnt(0); task.setLastModifiedTime(System.currentTimeMillis()); task.setCreatedTime(System.currentTimeMillis()); }); - log.debug("[TaskTracker] JobInstance(id={}) add tasks: {}", jobInstanceInfo.getInstanceId(), newTaskList); + log.debug("[TaskTracker-{}] receive new tasks: {}", instanceInfo.getInstanceId(), newTaskList); return taskPersistenceService.batchSave(newTaskList); } + /** + * ProcessorTracker 上报健康状态 + */ + public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) { + + ProcessorTrackerStatus processorTrackerStatus = pTAddress2Status.get(heartbeatReq.getIp()); + processorTrackerStatus.update(heartbeatReq); + + } + public boolean finished() { return finished.get(); } @@ -170,7 +191,7 @@ public class TaskTracker { * 任务是否超时 */ public boolean isTimeout() { - return System.currentTimeMillis() - startTime > jobTimeLimitMS; + return System.currentTimeMillis() - startTime > instanceInfo.getInstanceTimeoutMS(); } /** @@ -180,8 +201,8 @@ public class TaskTracker { TaskDO rootTask = new TaskDO(); rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); - rootTask.setJobId(jobInstanceInfo.getJobId()); - rootTask.setInstanceId(jobInstanceInfo.getInstanceId()); + rootTask.setJobId(instanceInfo.getJobId()); + rootTask.setInstanceId(instanceInfo.getInstanceId()); rootTask.setTaskId(TaskConstant.ROOT_TASK_ID); rootTask.setFailedCnt(0); rootTask.setAddress(NetUtils.getLocalHost()); @@ -192,7 +213,7 @@ public class TaskTracker { if (!taskPersistenceService.save(rootTask)) { throw new RuntimeException("create root task failed."); } - log.info("[TaskTracker] create root task successfully for instance(instanceId={}).", jobInstanceInfo.getInstanceId()); + log.info("[TaskTracker-{}] create root task successfully.", instanceInfo.getInstanceId()); } @@ -205,34 +226,68 @@ public class TaskTracker { */ private class DispatcherRunnable implements Runnable { + // 数据库查询限制,每次最多查询几个任务 + private static final int DB_QUERY_LIMIT = 100; + @Override public void run() { - taskPersistenceService.getTaskByStatus(jobInstanceInfo.getInstanceId(), TaskStatus.WAITING_DISPATCH, 100).forEach(task -> { - try { - // 构造 worker 执行请求 - TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(jobInstanceInfo, task); + Stopwatch stopwatch = Stopwatch.createStarted(); + String instanceId = instanceInfo.getInstanceId(); - // 构造 akka 可访问节点路径 - String targetIP = task.getAddress(); - if (StringUtils.isEmpty(targetIP)) { - targetIP = allWorkerAddress.get(ThreadLocalRandom.current().nextInt(allWorkerAddress.size())); + // 1. 获取可以派发任务的 ProcessorTracker + List availablePtIps = Lists.newLinkedList(); + pTAddress2Status.forEach((ip, ptStatus) -> { + if (ptStatus.available()) { + availablePtIps.add(ip); + } + }); + + // 2. 没有可用 ProcessorTracker,本次不派发 + if (availablePtIps.isEmpty()) { + log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId); + return; + } + + // 3. 避免大查询,分批派发任务 + long currentDispatchNum = 0; + long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency(); + AtomicInteger index = new AtomicInteger(0); + + // 4. 循环查询数据库,获取需要派发的任务 + while (maxDispatchNum > currentDispatchNum) { + + List needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, DB_QUERY_LIMIT); + currentDispatchNum += needDispatchTasks.size(); + + needDispatchTasks.forEach(task -> { + + TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task); + + // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address + String ptAddress = task.getAddress(); + if (StringUtils.isEmpty(ptAddress)) { + ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } - String targetPath = AkkaUtils.getAkkaRemotePath(targetIP, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); - ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath); - - // 发送请求(Akka的tell是至少投递一次,经实验表明无法投递消息也不会报错...印度啊...) - targetActor.tell(req, null); + String ptActorPath = AkkaUtils.getAkkaRemotePath(ptAddress, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); + ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath); + ptActor.tell(startTaskReq, null); + // 更新 ProcessorTrackerStatus 状态 + pTAddress2Status.get(ptAddress).setDispatched(true); // 更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, null); - log.debug("[TaskTracker] dispatch task({instanceId={},taskId={},taskName={}} successfully.)", task.getInstanceId(), task.getTaskId(), task.getTaskName()); - }catch (Exception e) { - // 调度失败,不修改数据库,下次重新随机派发给 remote actor - log.warn("[TaskTracker] dispatch task({}) failed.", task); + log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}} successfully.)", task.getInstanceId(), task.getTaskId(), task.getTaskName()); + }); + + // 数量不足 或 查询失败,则终止循环 + if (needDispatchTasks.size() < DB_QUERY_LIMIT) { + return; } - }); + } + + log.debug("[TaskTracker-{}] dispatch {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch); } } @@ -243,10 +298,9 @@ public class TaskTracker { private static final long TIME_OUT_MS = 5000; - private void innerRun() { - final String instanceId = jobInstanceInfo.getInstanceId(); + final String instanceId = instanceInfo.getInstanceId(); // 1. 查询统计信息 Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId); @@ -261,10 +315,10 @@ public class TaskTracker { long finishedNum = succeedNum + failedNum; long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum; - log.debug("[TaskTracker] status check result: {}", status2Num); + log.debug("[TaskTracker-{}] status check result: {}", instanceId, status2Num); TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); - req.setJobId(jobInstanceInfo.getJobId()); + req.setJobId(instanceInfo.getJobId()); req.setInstanceId(instanceId); req.setTotalTaskNum(finishedNum + unfinishedNum); req.setSucceedTaskNum(succeedNum); @@ -275,13 +329,13 @@ public class TaskTracker { if (unfinishedNum == 0) { boolean finishedBoolean = true; - ExecuteType executeType = ExecuteType.valueOf(jobInstanceInfo.getExecuteType()); + ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); if (executeType == ExecuteType.STANDALONE) { List allTask = taskPersistenceService.getAllTask(instanceId); if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) { - log.warn("[TaskTracker] there must have some bug in TaskTracker."); + log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId); }else { resultTask = allTask.get(0); } @@ -328,7 +382,7 @@ public class TaskTracker { AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(TIME_OUT_MS, TimeUnit.MILLISECONDS); serverAccepted = askResponse.isSuccess(); }catch (Exception e) { - log.warn("[TaskTracker] report finished instance(id={}&result={}) failed.", instanceId, resultTask.getResult()); + log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, resultTask.getResult()); } // 服务器未接受上报,则等待下次重新上报 @@ -337,8 +391,8 @@ public class TaskTracker { } // 服务器已经更新状态,任务已经执行完毕,开始释放所有资源 - log.info("[TaskTracker] instance(jobId={}&instanceId={}) process finished,result = {}, start to release resource...", - jobInstanceInfo.getJobId(), instanceId, resultTask.getResult()); + log.info("[TaskTracker-{}] instance(jobId={}) process finished,result = {}, start to release resource...", + instanceId, instanceInfo.getJobId(), resultTask.getResult()); TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); stopRequest.setInstanceId(instanceId); allWorkerAddress.forEach(ptIP -> { @@ -367,7 +421,7 @@ public class TaskTracker { long elapsedTime = currentMS - uncheckTask.getLastModifiedTime(); if (elapsedTime > TIME_OUT_MS) { updateTaskStatus(instanceId, uncheckTask.getTaskId(), TaskStatus.WAITING_DISPATCH.getValue(), null, true); - log.warn("[TaskTracker] task(instanceId={},taskId={}) try to dispatch again due to unreceived the response from processor tracker.", + log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from processor tracker.", instanceId, uncheckTask.getTaskId()); } @@ -383,7 +437,7 @@ public class TaskTracker { try { innerRun(); }catch (Exception e) { - log.warn("[TaskTracker] status checker execute failed, please fix the bug (@tjq)!", e); + log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceInfo.getInstanceId(), e); } } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index 20bdd7c3..04d60ad4 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.worker.persistence; +import com.github.kfcfans.common.utils.CommonUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.springframework.util.CollectionUtils; @@ -59,7 +60,7 @@ public class TaskDAOImpl implements TaskDAO { @Override public boolean batchDelete(String instanceId, List taskIds) throws SQLException { String deleteSQL = "delete from task_info where instance_id = '%s' and task_id in %s"; - String sql = String.format(deleteSQL, instanceId, getInStringCondition(taskIds)); + String sql = String.format(deleteSQL, instanceId, CommonUtils.getInStringCondition(taskIds)); try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { stat.executeUpdate(sql); return true; @@ -180,12 +181,5 @@ public class TaskDAOImpl implements TaskDAO { ps.setLong(11, task.getLastModifiedTime()); } - private static String getInStringCondition(Collection collection) { - if (CollectionUtils.isEmpty(collection)) { - return "()"; - } - StringBuilder sb = new StringBuilder(" ( "); - collection.forEach(str -> sb.append("'").append(str).append("',")); - return sb.replace(sb.length() -1, sb.length(), " ) ").toString(); - } + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index 45527d58..1024c9c0 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -149,6 +149,25 @@ public class TaskPersistenceService { return Maps.newHashMap(); } + /** + * 获取等待执行的任务数量 (ProcessorTracker侧) + * @param spInstanceId 特殊处理的 instanceId + * @return 数量 + */ + public Optional getWaitingToRunTaskNum(String spInstanceId) { + try { + return execute(() -> { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setQueryContent("count(*) as num"); + Long num = Long.parseLong(taskDAO.simpleQueryPlus(query).get(0).get("NUM").toString()); + return Optional.of(num); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] getWaitingToRunTaskNum for instance(id={}) failed.", spInstanceId, e); + } + return Optional.empty(); + } + /** * 查询 taskId -> taskResult,reduce阶段或postProcess 阶段使用 */ @@ -162,7 +181,7 @@ public class TaskPersistenceService { } /** - * 查询任务状态(只查询 status,节约 I/O 资源) + * 查询任务状态(只查询 status,节约 I/O 资源 -> 测试表明,效果惊人...磁盘I/O果然是重要瓶颈...) */ public Optional getTaskStatus(String instanceId, String taskId) { @@ -216,6 +235,29 @@ public class TaskPersistenceService { return false; } + /** + * 批量更新 Task 状态 + */ + public boolean batchUpdateTaskStatus(String instanceId, List taskIds, TaskStatus status, String result) { + try { + return execute(() -> { + + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setQueryCondition(String.format(" task_id in %s ", CommonUtils.getInStringCondition(taskIds))); + + TaskDO updateEntity = new TaskDO(); + updateEntity.setStatus(status.getValue()); + updateEntity.setResult(result); + return taskDAO.simpleUpdate(query, updateEntity); + }); + }catch (Exception e) { + log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskIds={},status={},result={}.", + instanceId, taskIds, status, result, e); + } + return false; + } + public boolean updateRetryTask(String instanceId, String taskId, int failedCnt) { try { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java similarity index 61% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java index bc7a601f..0a7c45ad 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java @@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.pojo.model; import lombok.Data; +import java.io.Serializable; + /** * 被调度执行的任务实例详情 * @@ -9,25 +11,40 @@ import lombok.Data; * @since 2020/3/16 */ @Data -public class JobInstanceInfo { +public class InstanceInfo implements Serializable { + /** + * 基础信息 + */ private String jobId; private String instanceId; + + /** + * 任务执行处理器信息 + */ // 任务执行类型,单机、广播、MR private String executeType; // 处理器类型(JavaBean、Jar、脚本等) private String processorType; // 处理器信息 private String processorInfo; - // 任务执行时间限制,单位毫秒 - private long timeLimit; - // 可用处理器地址,可能多值,逗号分隔 - private String allWorkerAddress; + /** + * 超时时间 + */ + // 整个任务的总体超时时间 + private long instanceTimeoutMS; + // Task的超时时间 + private long taskTimeoutMS; + + /** + * 任务运行参数 + */ + // 任务级别的参数,相当于类的static变量 private String jobParams; + // 实例级别的参数,相当于类的普通变量 private String instanceParams; - /* *********************** Map/MapReduce 任务专用 *********************** */ // 每台机器的处理线程数上限 private int threadConcurrency; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/ProcessorTrackerStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/ProcessorTrackerStatus.java new file mode 100644 index 00000000..4dd712df --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/ProcessorTrackerStatus.java @@ -0,0 +1,80 @@ +package com.github.kfcfans.oms.worker.pojo.model; + +import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * ProcessorTracker 的状态 + * + * @author tjq + * @since 2020/3/27 + */ +@Data +@NoArgsConstructor +public class ProcessorTrackerStatus { + + private static final int DISPATCH_THRESHOLD = 20; + private static final int HEARTBEAT_TIMEOUT_MS = 60000; + + // 冗余存储一份 IP 地址 + private String ip; + // 上次活跃时间 + private long lastActiveTime; + // 等待执行任务数 + private long remainTaskNum; + // 是否被派发过任务 + private boolean dispatched; + + public void init(String ip) { + this.ip = ip; + this.lastActiveTime = System.currentTimeMillis(); + this.remainTaskNum = 0; + this.dispatched = false; + } + + public void update(ProcessorTrackerStatusReportReq req) { + + // 延迟到达的请求,直接忽略 + if (req.getTime() <= lastActiveTime) { + return; + } + + this.ip = req.getIp(); + this.lastActiveTime = req.getTime(); + this.remainTaskNum = req.getRemainTaskNum(); + this.dispatched = true; + } + + /** + * 是否可用 + */ + public boolean available() { + + // 未曾派发过,默认可用 + if (!dispatched) { + return true; + } + + // 长时间未收到心跳消息,则不可用 + if (isTimeout()) { + return false; + } + + // 留有过多待处理任务,则不可用 + if (remainTaskNum >= DISPATCH_THRESHOLD) { + return false; + } + + // TODO:后续考虑加上机器健康度等信息 + + return true; + } + + /** + * 是否超时(超过一定时间没有收到心跳) + */ + public boolean isTimeout() { + return System.currentTimeMillis() - lastActiveTime > HEARTBEAT_TIMEOUT_MS; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java index d642cbdc..8b363fe4 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java @@ -2,7 +2,7 @@ package com.github.kfcfans.oms.worker.pojo.request; import com.github.kfcfans.oms.worker.common.ThreadLocalStore; import com.github.kfcfans.oms.worker.common.utils.SerializerUtils; -import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; @@ -34,15 +34,15 @@ public class ProcessorMapTaskRequest implements Serializable { private byte[] taskContent; } - public ProcessorMapTaskRequest(TaskContext taskContext, List subTaskList, String taskName) { + public ProcessorMapTaskRequest(TaskDO parentTask, List subTaskList, String taskName) { - this.instanceId = taskContext.getInstanceId(); + this.instanceId = parentTask.getInstanceId(); this.taskName = taskName; this.subTasks = Lists.newLinkedList(); subTaskList.forEach(subTask -> { // 同一个 Task 内部可能多次 Map,因此还是要确保线程级别的唯一 - String subTaskId = taskContext.getTaskId() + "." + ThreadLocalStore.TASK_ID_THREAD_LOCAL.get().getAndIncrement(); + String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.TASK_ID_THREAD_LOCAL.get().getAndIncrement(); // 写入类名,方便反序列化 subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask))); }); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java new file mode 100644 index 00000000..525414f7 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java @@ -0,0 +1,41 @@ +package com.github.kfcfans.oms.worker.pojo.request; + +import com.github.kfcfans.oms.worker.common.utils.NetUtils; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * ProcessorTracker 定时向 TaskTracker 上报健康状态 + * + * @author tjq + * @since 2020/3/27 + */ +@Data +@NoArgsConstructor +public class ProcessorTrackerStatusReportReq { + + private String instanceId; + + /** + * 请求发起时间 + */ + private long time; + + /** + * 等待执行的任务数量,内存队列数 + 数据库持久数 + */ + private long remainTaskNum; + + /** + * 本机地址 + */ + private String ip; + + public ProcessorTrackerStatusReportReq(String instanceId, long remainTaskNum) { + this.instanceId = instanceId; + this.remainTaskNum = remainTaskNum; + + this.time = System.currentTimeMillis(); + this.ip = NetUtils.getLocalHost(); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java index 99711eb4..0d0a080c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -2,7 +2,7 @@ package com.github.kfcfans.oms.worker.pojo.request; import com.github.kfcfans.oms.worker.common.utils.NetUtils; import com.github.kfcfans.oms.worker.persistence.TaskDO; -import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; +import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -10,7 +10,7 @@ import lombok.Setter; import java.io.Serializable; /** - * JobTracker 派发 task 进行执行 + * TaskTracker 派发 task 进行执行 * * @author tjq * @since 2020/3/17 @@ -20,52 +20,26 @@ import java.io.Serializable; @NoArgsConstructor public class TaskTrackerStartTaskReq implements Serializable { - private String jobId; - private String instanceId; - // 任务执行类型,单机、广播、MR - private String executeType; - // 处理器类型(JavaBean、Jar、脚本等) - private String processorType; - // 处理器信息 - private String processorInfo; - // 并发计算线程数 - private int threadConcurrency; // TaskTracker 地址 private String taskTrackerAddress; - // 任务超时时间 - private long jobTimeLimitMS; - - private String jobParams; - private String instanceParams; + private InstanceInfo instanceInfo; private String taskId; private String taskName; - private byte[] subTaskContent; - // 子任务允许的重试次数 - private int maxRetryTimes; + private byte[] taskContent; // 子任务当前重试次数 - private int currentRetryTimes; + private int taskCurrentRetryNums; - public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) { + public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task) { - jobId = instanceInfo.getJobId(); - instanceId = instanceInfo.getInstanceId(); - processorType = instanceInfo.getProcessorType(); - processorInfo = instanceInfo.getProcessorInfo(); - threadConcurrency = instanceInfo.getThreadConcurrency(); - executeType = instanceInfo.getExecuteType(); - taskTrackerAddress = NetUtils.getLocalHost(); - jobTimeLimitMS = instanceInfo.getTimeLimit(); + this.taskTrackerAddress = NetUtils.getLocalHost(); + this.instanceInfo = instanceInfo; - jobParams = instanceInfo.getJobParams(); - instanceParams = instanceInfo.getInstanceParams(); + this.taskId = task.getTaskId(); + this.taskName = task.getTaskName(); + this.taskContent = task.getTaskContent(); - taskId = task.getTaskId(); - taskName = task.getTaskName(); - subTaskContent = task.getTaskContent(); - - maxRetryTimes = instanceInfo.getTaskRetryNum(); - currentRetryTimes = task.getFailedCnt(); + this.taskCurrentRetryNums = task.getFailedCnt(); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java index 952f42cf..4deca2a3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java @@ -31,7 +31,6 @@ public class TaskContext { private Object subTask; - private String taskTrackerAddress; public String getDescription() { @@ -40,8 +39,7 @@ public class TaskContext { ", taskId='" + taskId + '\'' + ", taskName='" + taskName + '\'' + ", jobParams='" + jobParams + '\'' + - ", instanceParams='" + instanceParams + '\'' + - ", taskTrackerAddress='" + taskTrackerAddress; + ", instanceParams='" + instanceParams; } @Override diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java index 7e2536e7..62766936 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java @@ -7,6 +7,7 @@ import com.github.kfcfans.oms.worker.common.ThreadLocalStore; import com.github.kfcfans.common.AkkaConstant; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; +import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest; import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.oms.worker.sdk.TaskContext; @@ -48,15 +49,15 @@ public abstract class MapReduceProcessor implements BasicProcessor { log.warn("[MapReduceProcessor] map task size is too large, network maybe overload... please try to split the tasks."); } - TaskContext taskContext = ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.get(); + TaskDO task = ThreadLocalStore.TASK_THREAD_LOCAL.get(); // 1. 构造请求 - ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(taskContext, taskList, taskName); + ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) boolean requestSucceed = false; try { - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskContext.getTaskTrackerAddress(), AkkaConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(task.getAddress(), AkkaConstant.Task_TRACKER_ACTOR_NAME); ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); CompletionStage requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS)); AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -73,8 +74,8 @@ public abstract class MapReduceProcessor implements BasicProcessor { } public boolean isRootTask() { - TaskContext taskContext = ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.get(); - return TaskConstant.ROOT_TASK_ID.equals(taskContext.getTaskId()); + TaskDO task = ThreadLocalStore.TASK_THREAD_LOCAL.get(); + return TaskConstant.ROOT_TASK_ID.equals(task.getTaskId()); } public abstract ProcessResult reduce(TaskContext taskContext, Map taskId2Result); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java index 310c9708..5ef3cfbf 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java @@ -32,7 +32,7 @@ public class PersistenceServiceTest { task.setInstanceId("10086" + ThreadLocalRandom.current().nextInt(2)); task.setTaskId(i + ""); task.setFailedCnt(0); - task.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue()); + task.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); task.setTaskName("ROOT_TASK"); task.setLastModifiedTime(System.currentTimeMillis()); task.setCreatedTime(System.currentTimeMillis()); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java index 1c1be419..c95bd5e0 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java @@ -9,6 +9,7 @@ import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.common.AkkaConstant; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.NetUtils; +import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; import com.typesafe.config.ConfigFactory; import org.junit.jupiter.api.AfterAll; @@ -60,20 +61,30 @@ public class ProcessorTrackerTest { } private static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) { + + InstanceInfo instanceInfo = new InstanceInfo(); + + instanceInfo.setJobId("1"); + instanceInfo.setInstanceId("10086"); + + instanceInfo.setExecuteType(ExecuteType.STANDALONE.name()); + instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); + instanceInfo.setProcessorInfo(processor); + + instanceInfo.setInstanceTimeoutMS(500000); + instanceInfo.setTaskTimeoutMS(5000000); + + instanceInfo.setThreadConcurrency(5); + instanceInfo.setTaskRetryNum(3); + TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(); - req.setJobId("1"); - req.setInstanceId("10086"); + + req.setTaskTrackerAddress(NetUtils.getLocalHost()); + req.setInstanceInfo(instanceInfo); + req.setTaskId("0"); req.setTaskName("ROOT_TASK"); - req.setMaxRetryTimes(3); - req.setCurrentRetryTimes(0); - - req.setExecuteType(ExecuteType.STANDALONE.name()); - req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); - req.setProcessorInfo(processor); - req.setThreadConcurrency(5); - req.setTaskTrackerAddress(NetUtils.getLocalHost()); - req.setJobTimeLimitMS(123132); + req.setTaskCurrentRetryNums(0); return req; } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java index 689e5d25..32246303 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java @@ -63,7 +63,8 @@ public class TaskTrackerTest { req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); req.setTaskRetryNum(3); req.setThreadConcurrency(20); - req.setTimeLimit(500000); + req.setInstanceTimeoutMS(500000); + req.setTaskTimeoutMS(500000); switch (executeType) { case STANDALONE: