mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
new dispatch mechanism & remove ProcessorTracker's task persistence
This commit is contained in:
parent
341953acec
commit
f844dd8db1
@ -15,29 +15,49 @@ public class ServerScheduleJobReq implements Serializable {
|
||||
|
||||
// 调度的服务器地址,默认通讯目标
|
||||
private String serverAddress;
|
||||
// 可用处理器地址,可能多值,逗号分隔
|
||||
private String allWorkerAddress;
|
||||
|
||||
/* *********************** 任务相关属性 *********************** */
|
||||
|
||||
/**
|
||||
* 基础信息
|
||||
*/
|
||||
private String jobId;
|
||||
private String instanceId;
|
||||
|
||||
/**
|
||||
* 任务执行处理器信息
|
||||
*/
|
||||
// 任务执行类型,单机、广播、MR
|
||||
private String executeType;
|
||||
// 处理器类型(JavaBean、Jar、脚本等)
|
||||
private String processorType;
|
||||
// 处理器信息
|
||||
private String processorInfo;
|
||||
// 任务执行时间限制,单位毫秒
|
||||
private long timeLimit;
|
||||
// 可用处理器地址,可能多值,逗号分隔
|
||||
private String allWorkerAddress;
|
||||
|
||||
|
||||
/**
|
||||
* 超时时间
|
||||
*/
|
||||
// 整个任务的总体超时时间
|
||||
private long instanceTimeoutMS;
|
||||
// Task的超时时间
|
||||
private long taskTimeoutMS;
|
||||
|
||||
/**
|
||||
* 任务运行参数
|
||||
*/
|
||||
// 任务级别的参数,相当于类的static变量
|
||||
private String jobParams;
|
||||
// 实例级别的参数,相当于类的普通变量
|
||||
private String instanceParams;
|
||||
|
||||
/* *********************** Map/MapReduce 任务专用 *********************** */
|
||||
|
||||
// 每台机器的处理线程数上限
|
||||
private int threadConcurrency;
|
||||
// 子任务重试次数(任务本身的重试机制由server控制)
|
||||
private int taskRetryNum;
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package com.github.kfcfans.common.utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
||||
@ -61,4 +62,17 @@ public class CommonUtils {
|
||||
}
|
||||
return booleanExecutor.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成数据库查询语句 in 后的条件
|
||||
* ["a", "b", "c"] -> ('a','b','c')
|
||||
*/
|
||||
public static String getInStringCondition(Collection<String> collection) {
|
||||
if (collection == null || collection.isEmpty()) {
|
||||
return "()";
|
||||
}
|
||||
StringBuilder sb = new StringBuilder(" ( ");
|
||||
collection.forEach(str -> sb.append("'").append(str).append("',"));
|
||||
return sb.replace(sb.length() -1, sb.length(), " ) ").toString();
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package com.github.kfcfans.oms.worker.actors;
|
||||
import akka.actor.AbstractActor;
|
||||
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker;
|
||||
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTrackerPool;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@ -27,13 +28,21 @@ public class ProcessorTrackerActor extends AbstractActor {
|
||||
* 处理来自TaskTracker的task执行请求
|
||||
*/
|
||||
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
|
||||
String jobId = req.getJobId();
|
||||
String instanceId = req.getInstanceId();
|
||||
String jobId = req.getInstanceInfo().getJobId();
|
||||
String instanceId = req.getInstanceInfo().getInstanceId();
|
||||
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> {
|
||||
ProcessorTracker pt = new ProcessorTracker(req);
|
||||
log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId);
|
||||
return pt;
|
||||
});
|
||||
processorTracker.submitTask(req);
|
||||
|
||||
TaskDO task = new TaskDO();
|
||||
|
||||
task.setTaskId(req.getTaskId());
|
||||
task.setTaskName(req.getTaskName());
|
||||
task.setTaskContent(req.getTaskContent());
|
||||
task.setFailedCnt(req.getTaskCurrentRetryNums());
|
||||
|
||||
processorTracker.submitTask(task);
|
||||
}
|
||||
}
|
||||
|
@ -7,11 +7,12 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
|
||||
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker;
|
||||
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
@ -33,6 +34,7 @@ public class TaskTrackerActor extends AbstractActor {
|
||||
.match(ProcessorReportTaskStatusReq.class, this::onReceiveProcessorReportTaskStatusReq)
|
||||
.match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq)
|
||||
.match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest)
|
||||
.match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq)
|
||||
.matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj))
|
||||
.build();
|
||||
}
|
||||
@ -48,10 +50,7 @@ public class TaskTrackerActor extends AbstractActor {
|
||||
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
|
||||
} else {
|
||||
|
||||
// 状态转化
|
||||
TaskStatus status = TaskStatus.convertStatus(TaskStatus.of(req.getStatus()));
|
||||
|
||||
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status.getValue(), req.getResult(), false);
|
||||
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult(), false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -102,6 +101,7 @@ public class TaskTrackerActor extends AbstractActor {
|
||||
|
||||
log.info("[TaskTrackerActor] instance(id={}) pre process finished.", req.getInstanceId());
|
||||
|
||||
// TODO:考虑放到 BroadcastTaskTracker 中去
|
||||
// 1. 生成集群子任务
|
||||
boolean success = req.isSuccess();
|
||||
if (success) {
|
||||
@ -137,12 +137,18 @@ public class TaskTrackerActor extends AbstractActor {
|
||||
}
|
||||
|
||||
// 原子创建,防止多实例的存在
|
||||
TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> {
|
||||
TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> new TaskTracker(req));
|
||||
}
|
||||
|
||||
JobInstanceInfo jobInstanceInfo = new JobInstanceInfo();
|
||||
BeanUtils.copyProperties(req, jobInstanceInfo);
|
||||
|
||||
return new TaskTracker(jobInstanceInfo);
|
||||
});
|
||||
/**
|
||||
* ProcessorTracker 心跳处理器
|
||||
*/
|
||||
private void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) {
|
||||
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
|
||||
if (taskTracker == null) {
|
||||
log.warn("[TaskTrackerActor] receive ProcessorTrackerStatusReportReq({}) but system can't find TaskTracker.", req);
|
||||
return;
|
||||
}
|
||||
taskTracker.receiveProcessorTrackerHeartbeat(req);
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
package com.github.kfcfans.oms.worker.common;
|
||||
|
||||
import com.github.kfcfans.oms.worker.sdk.TaskContext;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@ -13,7 +13,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
*/
|
||||
public class ThreadLocalStore {
|
||||
|
||||
public static final ThreadLocal<TaskContext> TASK_CONTEXT_THREAD_LOCAL = new ThreadLocal<>();
|
||||
public static final ThreadLocal<TaskDO> TASK_THREAD_LOCAL = new ThreadLocal<>();
|
||||
|
||||
public static final ThreadLocal<AtomicLong> TASK_ID_THREAD_LOCAL = new ThreadLocal<>();
|
||||
|
||||
|
@ -13,19 +13,12 @@ import lombok.Getter;
|
||||
@AllArgsConstructor
|
||||
public enum TaskStatus {
|
||||
|
||||
/* ******************* TaskTracker 专用 ******************* */
|
||||
WAITING_DISPATCH(1, "等待调度器调度"),
|
||||
DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功(但不保证worker收到)"),
|
||||
WORKER_RECEIVED(3, "worker接收成功,但未开始执行"),
|
||||
WORKER_PROCESSING(4, "worker正在执行"),
|
||||
WORKER_PROCESS_FAILED(5, "worker执行失败"),
|
||||
WORKER_PROCESS_SUCCESS(6, "worker执行成功"),
|
||||
|
||||
/* ******************* Worker 专用 ******************* */
|
||||
RECEIVE_SUCCESS(11, "成功接受任务但未开始执行(此时worker满载,暂时无法运行)"),
|
||||
PROCESSING(12, "执行中"),
|
||||
PROCESS_FAILED(13, "执行失败"),
|
||||
PROCESS_SUCCESS(14, "执行成功");
|
||||
WORKER_PROCESS_SUCCESS(6, "worker执行成功");
|
||||
|
||||
private int value;
|
||||
private String des;
|
||||
@ -38,14 +31,4 @@ public enum TaskStatus {
|
||||
}
|
||||
throw new IllegalArgumentException("no TaskStatus match the value of " + v);
|
||||
}
|
||||
|
||||
public static TaskStatus convertStatus(TaskStatus processorStatus) {
|
||||
switch (processorStatus) {
|
||||
case RECEIVE_SUCCESS: return WORKER_RECEIVED;
|
||||
case PROCESSING: return WORKER_PROCESSING;
|
||||
case PROCESS_FAILED: return WORKER_PROCESS_FAILED;
|
||||
case PROCESS_SUCCESS: return WORKER_PROCESS_SUCCESS;
|
||||
}
|
||||
throw new IllegalArgumentException(processorStatus.name() + " is not the processor status.");
|
||||
}
|
||||
}
|
||||
|
@ -9,9 +9,10 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
|
||||
import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
|
||||
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
|
||||
import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
|
||||
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
|
||||
import com.github.kfcfans.oms.worker.sdk.TaskContext;
|
||||
@ -20,7 +21,6 @@ import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
|
||||
import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
|
||||
@ -37,40 +37,44 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
@AllArgsConstructor
|
||||
public class ProcessorRunnable implements Runnable {
|
||||
|
||||
private final ActorSelection taskTrackerActor;
|
||||
|
||||
@Getter
|
||||
private final TaskTrackerStartTaskReq request;
|
||||
private InstanceInfo instanceInfo;
|
||||
private final ActorSelection taskTrackerActor;
|
||||
private final TaskDO task;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
String taskId = request.getTaskId();
|
||||
String instanceId = request.getInstanceId();
|
||||
String taskId = task.getTaskId();
|
||||
String instanceId = task.getInstanceId();
|
||||
|
||||
log.debug("[ProcessorRunnable] start to run task(instanceId={}&taskId={}&taskName={})", instanceId, taskId, request.getTaskName());
|
||||
log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
|
||||
|
||||
try {
|
||||
// 0. 完成执行上下文准备 & 上报执行信息
|
||||
TaskContext taskContext = new TaskContext();
|
||||
BeanUtils.copyProperties(request, taskContext);
|
||||
if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) {
|
||||
taskContext.setSubTask(SerializerUtils.deSerialized(request.getSubTaskContent()));
|
||||
BeanUtils.copyProperties(task, taskContext);
|
||||
taskContext.setMaxRetryTimes(instanceInfo.getTaskRetryNum());
|
||||
taskContext.setCurrentRetryTimes(task.getFailedCnt());
|
||||
taskContext.setJobParams(instanceInfo.getJobParams());
|
||||
taskContext.setInstanceParams(instanceInfo.getInstanceParams());
|
||||
if (task.getTaskContent() != null && task.getTaskContent().length > 0) {
|
||||
taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent()));
|
||||
}
|
||||
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
|
||||
ThreadLocalStore.TASK_THREAD_LOCAL.set(task);
|
||||
ThreadLocalStore.TASK_ID_THREAD_LOCAL.set(new AtomicLong(0));
|
||||
|
||||
reportStatus(TaskStatus.PROCESSING, null);
|
||||
reportStatus(TaskStatus.WORKER_PROCESSING, null);
|
||||
|
||||
// 1. 获取 Processor
|
||||
BasicProcessor processor = getProcessor();
|
||||
if (processor == null) {
|
||||
reportStatus(TaskStatus.PROCESS_FAILED, "NO_PROCESSOR");
|
||||
reportStatus(TaskStatus.WORKER_PROCESS_FAILED, "NO_PROCESSOR");
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 根任务特殊处理
|
||||
ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType());
|
||||
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
|
||||
if (TaskConstant.ROOT_TASK_ID.equals(taskId)) {
|
||||
|
||||
// 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task
|
||||
@ -86,7 +90,7 @@ public class ProcessorRunnable implements Runnable {
|
||||
spReq.setSuccess(processResult.isSuccess());
|
||||
spReq.setMsg(processResult.getMsg());
|
||||
}catch (Exception e) {
|
||||
log.warn("[ProcessorRunnable] broadcast task(instanceId={}) preProcess failed.", instanceId, e);
|
||||
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
|
||||
spReq.setSuccess(false);
|
||||
spReq.setMsg(e.toString());
|
||||
}
|
||||
@ -102,7 +106,7 @@ public class ProcessorRunnable implements Runnable {
|
||||
if (TaskConstant.LAST_TASK_ID.equals(taskId)) {
|
||||
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
log.info("[ProcessorRunnable] instance(instanceId={})' last task(taskId={}) start to process.", instanceId, taskId);
|
||||
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
|
||||
|
||||
ProcessResult lastResult;
|
||||
Map<String, String> taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId);
|
||||
@ -124,13 +128,13 @@ public class ProcessorRunnable implements Runnable {
|
||||
}
|
||||
}catch (Exception e) {
|
||||
lastResult = new ProcessResult(false, e.toString());
|
||||
log.warn("[ProcessorRunnable] execute last task(instanceId={},taskId={}) failed.", instanceId, taskId, e);
|
||||
log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
|
||||
}
|
||||
|
||||
TaskStatus status = lastResult.isSuccess() ? TaskStatus.PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED;
|
||||
TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
|
||||
reportStatus(status, lastResult.getMsg());
|
||||
|
||||
log.info("[ProcessorRunnable] instance(instanceId={},taskId={})' last task execute successfully, using time: {}", instanceId, taskId, stopwatch);
|
||||
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -140,21 +144,20 @@ public class ProcessorRunnable implements Runnable {
|
||||
try {
|
||||
processResult = processor.process(taskContext);
|
||||
}catch (Exception e) {
|
||||
log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e);
|
||||
log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e);
|
||||
processResult = new ProcessResult(false, e.toString());
|
||||
}
|
||||
reportStatus(processResult.isSuccess() ? TaskStatus.PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED, processResult.getMsg());
|
||||
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, processResult.getMsg());
|
||||
|
||||
}catch (Exception e) {
|
||||
log.error("[ProcessorRunnable] execute failed, please fix this bug!", e);
|
||||
log.error("[ProcessorRunnable-{}] execute failed, please fix this bug!", instanceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private BasicProcessor getProcessor() {
|
||||
BasicProcessor processor = null;
|
||||
ProcessorType processorType = ProcessorType.valueOf(request.getProcessorType());
|
||||
|
||||
String processorInfo = request.getProcessorInfo();
|
||||
ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());
|
||||
String processorInfo = instanceInfo.getProcessorInfo();
|
||||
|
||||
switch (processorType) {
|
||||
case EMBEDDED_JAVA:
|
||||
@ -163,7 +166,7 @@ public class ProcessorRunnable implements Runnable {
|
||||
try {
|
||||
processor = SpringUtils.getBean(processorInfo);
|
||||
}catch (Exception e) {
|
||||
log.warn("[ProcessorRunnable] no spring bean of processor(className={}).", processorInfo);
|
||||
log.warn("[ProcessorRunnable-{}] no spring bean of processor(className={}).", instanceInfo, processorInfo);
|
||||
}
|
||||
}
|
||||
// 反射加载
|
||||
@ -181,8 +184,8 @@ public class ProcessorRunnable implements Runnable {
|
||||
private void reportStatus(TaskStatus status, String result) {
|
||||
ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq();
|
||||
|
||||
req.setInstanceId(request.getInstanceId());
|
||||
req.setTaskId(request.getTaskId());
|
||||
req.setInstanceId(task.getInstanceId());
|
||||
req.setTaskId(task.getTaskId());
|
||||
req.setStatus(status.getValue());
|
||||
req.setResult(result);
|
||||
|
||||
|
@ -7,16 +7,14 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
|
||||
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
|
||||
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
@ -30,23 +28,15 @@ public class ProcessorTracker {
|
||||
|
||||
// 记录创建时间
|
||||
private long startTime;
|
||||
private long jobTimeLimitMS;
|
||||
|
||||
// 记录该 Job 相关信息
|
||||
// 任务实例信息
|
||||
private InstanceInfo instanceInfo;
|
||||
// 冗余 instanceId,方便日志
|
||||
private String instanceId;
|
||||
private String executeType;
|
||||
private String processorType;
|
||||
private String processorInfo;
|
||||
private int threadConcurrency;
|
||||
private String jobParams;
|
||||
private String instanceParams;
|
||||
private int maxRetryTimes;
|
||||
|
||||
private String taskTrackerAddress;
|
||||
private ActorSelection taskTrackerActorRef;
|
||||
|
||||
private ThreadPoolExecutor threadPool;
|
||||
private static final int MAX_QUEUE_SIZE = 20;
|
||||
|
||||
/**
|
||||
* 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T)
|
||||
@ -55,17 +45,9 @@ public class ProcessorTracker {
|
||||
|
||||
// 赋值
|
||||
this.startTime = System.currentTimeMillis();
|
||||
this.jobTimeLimitMS = request.getJobTimeLimitMS();
|
||||
this.instanceId = request.getInstanceId();
|
||||
this.executeType = request.getExecuteType();
|
||||
this.processorType = request.getProcessorType();
|
||||
this.processorInfo = request.getProcessorInfo();
|
||||
this.threadConcurrency = request.getThreadConcurrency();
|
||||
this.jobParams = request.getJobParams();
|
||||
this.instanceParams = request.getInstanceParams();
|
||||
this.maxRetryTimes = request.getMaxRetryTimes();
|
||||
this.instanceInfo = request.getInstanceInfo();
|
||||
this.instanceId = request.getInstanceInfo().getInstanceId();
|
||||
this.taskTrackerAddress = request.getTaskTrackerAddress();
|
||||
|
||||
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, AkkaConstant.Task_TRACKER_ACTOR_NAME);
|
||||
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
|
||||
|
||||
@ -75,49 +57,43 @@ public class ProcessorTracker {
|
||||
}
|
||||
|
||||
/**
|
||||
* 提交任务
|
||||
* 提交任务到线程池执行
|
||||
* 1.0版本:TaskTracker有任务就dispatch,导致 ProcessorTracker 本地可能堆积过多的任务,造成内存压力。为此 ProcessorTracker 在线程
|
||||
* 池队列堆积到一定程度时,会将数据持久化到DB,然后通过异步线程定时从数据库中取回任务,重新提交执行。
|
||||
* 联动:数据库的SPID设计、TaskStatus段落设计等,全部取消...
|
||||
* last commitId: 341953aceceafec0fbe7c3d9a3e26451656b945e
|
||||
* 2.0版本:ProcessorTracker定时向TaskTracker发送心跳消息,心跳消息中包含了当前线程池队列任务个数,TaskTracker根据ProcessorTracker
|
||||
* 的状态判断能否继续派发任务。因此,ProcessorTracker本地不会堆积过多任务,故删除 持久化机制 ╥﹏╥...!
|
||||
* @param newTask 需要提交到线程池执行的任务
|
||||
*/
|
||||
public void submitTask(TaskTrackerStartTaskReq newTaskReq) {
|
||||
public void submitTask(TaskDO newTask) {
|
||||
|
||||
// 1. 回复接受成功
|
||||
// 1. 设置值并提交执行
|
||||
newTask.setJobId(instanceInfo.getJobId());
|
||||
newTask.setInstanceId(instanceInfo.getInstanceId());
|
||||
newTask.setAddress(taskTrackerAddress);
|
||||
|
||||
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask);
|
||||
threadPool.submit(processorRunnable);
|
||||
|
||||
// 2. 回复接收成功
|
||||
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
|
||||
BeanUtils.copyProperties(newTaskReq, reportReq);
|
||||
reportReq.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
|
||||
reportReq.setInstanceId(instanceId);
|
||||
reportReq.setTaskId(newTask.getTaskId());
|
||||
reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
|
||||
|
||||
reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
|
||||
taskTrackerActorRef.tell(reportReq, null);
|
||||
|
||||
// 2.1 内存控制,持久化
|
||||
if (threadPool.getQueue().size() > MAX_QUEUE_SIZE) {
|
||||
|
||||
TaskDO newTask = new TaskDO();
|
||||
BeanUtils.copyProperties(newTaskReq, newTask);
|
||||
newTask.setTaskContent(newTaskReq.getSubTaskContent());
|
||||
newTask.setAddress(newTaskReq.getTaskTrackerAddress());
|
||||
newTask.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
|
||||
newTask.setFailedCnt(newTaskReq.getCurrentRetryTimes());
|
||||
newTask.setCreatedTime(System.currentTimeMillis());
|
||||
newTask.setLastModifiedTime(System.currentTimeMillis());
|
||||
// 特殊处理 instanceId,防止冲突
|
||||
newTask.setInstanceId(getSPInstanceId(instanceId));
|
||||
|
||||
boolean save = TaskPersistenceService.INSTANCE.save(newTask);
|
||||
if (save) {
|
||||
log.debug("[ProcessorTracker] persistent task({}) succeed.", newTask);
|
||||
}else {
|
||||
log.warn("[ProcessorTracker] persistent task({}) failed.", newTask);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 2.2 提交执行
|
||||
ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, newTaskReq);
|
||||
threadPool.submit(processorRunnable);
|
||||
log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
|
||||
instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务是否超时
|
||||
*/
|
||||
public boolean isTimeout() {
|
||||
return System.currentTimeMillis() - startTime > jobTimeLimitMS;
|
||||
return System.currentTimeMillis() - startTime > instanceInfo.getInstanceTimeoutMS();
|
||||
}
|
||||
|
||||
|
||||
@ -129,7 +105,7 @@ public class ProcessorTracker {
|
||||
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
|
||||
// 自定义线程池中线程名称
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build();
|
||||
threadPool = new ThreadPoolExecutor(threadConcurrency, threadConcurrency, 60L, TimeUnit.SECONDS, queue, threadFactory);
|
||||
threadPool = new ThreadPoolExecutor(instanceInfo.getTaskRetryNum(), instanceInfo.getTaskRetryNum(), 60L, TimeUnit.SECONDS, queue, threadFactory);
|
||||
// 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0)
|
||||
threadPool.allowCoreThreadTimeOut(true);
|
||||
}
|
||||
@ -141,69 +117,25 @@ public class ProcessorTracker {
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build();
|
||||
ScheduledExecutorService timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
|
||||
|
||||
timingPool.scheduleWithFixedDelay(new PoolStatusCheckRunnable(), 60, 10, TimeUnit.SECONDS);
|
||||
timingPool.scheduleAtFixedRate(new TimingStatusReportRunnable(), 0, 15, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 定期检查线程池运行状态(内存中的任务数量不足,则即使从数据库中获取并提交执行)
|
||||
* 定时向 TaskTracker 汇报(携带任务执行信息的心跳)
|
||||
*/
|
||||
private class PoolStatusCheckRunnable implements Runnable {
|
||||
private class TimingStatusReportRunnable implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
int queueSize = threadPool.getQueue().size();
|
||||
if (queueSize >= MAX_QUEUE_SIZE / 2) {
|
||||
return;
|
||||
}
|
||||
// 1. 查询数据库中等待执行的任务数量
|
||||
long waitingNum = threadPool.getQueue().size();
|
||||
|
||||
TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE;
|
||||
|
||||
// 查询时也用特殊处理的 instanceId 查即可
|
||||
List<TaskDO> taskDOList =taskPersistenceService.getTaskByStatus(getSPInstanceId(instanceId), TaskStatus.RECEIVE_SUCCESS, MAX_QUEUE_SIZE / 2);
|
||||
|
||||
if (CollectionUtils.isEmpty(taskDOList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> deletedIds = Lists.newLinkedList();
|
||||
|
||||
log.debug("[ProcessorTracker] timing add task to thread pool.");
|
||||
|
||||
// 提交到线程池执行
|
||||
taskDOList.forEach(task -> {
|
||||
|
||||
// 还原 instanceId
|
||||
task.setInstanceId(instanceId);
|
||||
runTask(task);
|
||||
deletedIds.add(task.getTaskId());
|
||||
});
|
||||
|
||||
// 删除任务(需要使用特殊instanceId)
|
||||
taskPersistenceService.batchDelete(getSPInstanceId(instanceId), deletedIds);
|
||||
}
|
||||
|
||||
private void runTask(TaskDO task) {
|
||||
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
|
||||
BeanUtils.copyProperties(task, req);
|
||||
|
||||
req.setExecuteType(executeType);
|
||||
req.setProcessorType(processorType);
|
||||
req.setProcessorInfo(processorInfo);
|
||||
req.setTaskTrackerAddress(taskTrackerAddress);
|
||||
req.setJobParams(jobParams);
|
||||
req.setInstanceParams(instanceParams);
|
||||
req.setSubTaskContent(task.getTaskContent());
|
||||
req.setMaxRetryTimes(maxRetryTimes);
|
||||
req.setCurrentRetryTimes(task.getFailedCnt());
|
||||
|
||||
ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, req);
|
||||
threadPool.submit(processorRunnable);
|
||||
// 2. 发送请求
|
||||
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceInfo.getInstanceId(), waitingNum);
|
||||
taskTrackerActorRef.tell(req, null);
|
||||
}
|
||||
}
|
||||
|
||||
private static String getSPInstanceId(String instanceId) {
|
||||
return "L" + instanceId;
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import akka.actor.ActorSelection;
|
||||
import akka.pattern.Patterns;
|
||||
import com.github.kfcfans.common.ExecuteType;
|
||||
import com.github.kfcfans.common.JobInstanceStatus;
|
||||
import com.github.kfcfans.common.request.ServerScheduleJobReq;
|
||||
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
import com.github.kfcfans.oms.worker.OhMyWorker;
|
||||
@ -15,14 +16,19 @@ import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
|
||||
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.ProcessorTrackerStatus;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@ -33,6 +39,7 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新
|
||||
@ -44,44 +51,48 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ToString
|
||||
public class TaskTracker {
|
||||
|
||||
protected long startTime;
|
||||
protected long jobTimeLimitMS;
|
||||
|
||||
private long startTime;
|
||||
// 任务实例信息
|
||||
protected JobInstanceInfo jobInstanceInfo;
|
||||
private InstanceInfo instanceInfo;
|
||||
|
||||
@Getter
|
||||
protected List<String> allWorkerAddress;
|
||||
private List<String> allWorkerAddress;
|
||||
private Map<String, ProcessorTrackerStatus> pTAddress2Status;
|
||||
|
||||
protected TaskPersistenceService taskPersistenceService;
|
||||
protected ScheduledExecutorService scheduledPool;
|
||||
// 数据库持久化服务
|
||||
private TaskPersistenceService taskPersistenceService;
|
||||
// 定时任务线程池
|
||||
private ScheduledExecutorService scheduledPool;
|
||||
|
||||
protected AtomicBoolean finished = new AtomicBoolean(false);
|
||||
private AtomicBoolean finished = new AtomicBoolean(false);
|
||||
|
||||
public TaskTracker(JobInstanceInfo jobInstanceInfo) {
|
||||
|
||||
log.info("[TaskTracker] start to create TaskTracker for instance({}).", jobInstanceInfo);
|
||||
public TaskTracker(ServerScheduleJobReq req) {
|
||||
|
||||
this.startTime = System.currentTimeMillis();
|
||||
this.jobTimeLimitMS = jobInstanceInfo.getTimeLimit();
|
||||
|
||||
this.jobInstanceInfo = jobInstanceInfo;
|
||||
// 1. 初始化成员变量
|
||||
this.instanceInfo = new InstanceInfo();
|
||||
BeanUtils.copyProperties(req, instanceInfo);
|
||||
allWorkerAddress = CommonSJ.commaSplitter.splitToList(req.getAllWorkerAddress());
|
||||
pTAddress2Status = Maps.newConcurrentMap();
|
||||
allWorkerAddress.forEach(ip -> {
|
||||
ProcessorTrackerStatus pts = new ProcessorTrackerStatus();
|
||||
pts.init(ip);
|
||||
pTAddress2Status.put(ip, pts);
|
||||
});
|
||||
this.taskPersistenceService = TaskPersistenceService.INSTANCE;
|
||||
|
||||
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build();
|
||||
this.scheduledPool = Executors.newScheduledThreadPool(2, factory);
|
||||
|
||||
allWorkerAddress = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress());
|
||||
|
||||
// 持久化根任务
|
||||
// 2. 持久化根任务
|
||||
persistenceRootTask();
|
||||
|
||||
// 定时任务1:任务派发
|
||||
// 3. 启动定时任务(任务派发 & 状态检查)
|
||||
scheduledPool.scheduleWithFixedDelay(new DispatcherRunnable(), 0, 5, TimeUnit.SECONDS);
|
||||
|
||||
// 定时任务2:状态检查
|
||||
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS);
|
||||
|
||||
log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", req.getInstanceId(), req);
|
||||
}
|
||||
|
||||
|
||||
@ -102,7 +113,7 @@ public class TaskTracker {
|
||||
Optional<TaskStatus> dbTaskStatusOpt = taskPersistenceService.getTaskStatus(instanceId, taskId);
|
||||
|
||||
if (!dbTaskStatusOpt.isPresent()) {
|
||||
log.warn("[TaskTracker] get task status failed when try to update new task status, current params is instanceId={},taskId={},newStatus={}.",
|
||||
log.warn("[TaskTracker-{}] query TaskStatus from DB failed when try to update new TaskStatus(taskId={},newStatus={}).",
|
||||
instanceId, taskId, newStatus);
|
||||
}
|
||||
|
||||
@ -110,7 +121,7 @@ public class TaskTracker {
|
||||
if (dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH).getValue() > newStatus) {
|
||||
// 必存在,但不怎么写,Java会警告...
|
||||
TaskStatus dbTaskStatus = dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH);
|
||||
log.warn("[TaskTracker] task(instanceId={},taskId={},dbStatus={},requestStatus={}) status conflict, taskTracker won't update the status.",
|
||||
log.warn("[TaskTracker-{}] task(taskId={},dbStatus={},requestStatus={}) status conflict, TaskTracker won't update the status.",
|
||||
instanceId, taskId, dbTaskStatus, nTaskStatus);
|
||||
return;
|
||||
}
|
||||
@ -119,11 +130,11 @@ public class TaskTracker {
|
||||
if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
|
||||
|
||||
// 数据库查询失败的话,就只重试一次
|
||||
int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(jobInstanceInfo.getTaskRetryNum() - 1);
|
||||
if (failedCnt < jobInstanceInfo.getTaskRetryNum()) {
|
||||
int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(instanceInfo.getTaskRetryNum() - 1);
|
||||
if (failedCnt < instanceInfo.getTaskRetryNum()) {
|
||||
boolean retryTask = taskPersistenceService.updateRetryTask(instanceId, taskId, failedCnt + 1);
|
||||
if (retryTask) {
|
||||
log.info("[TaskTracker] task(instanceId={},taskId={}) will have a retry.", instanceId, taskId);
|
||||
log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -134,7 +145,7 @@ public class TaskTracker {
|
||||
}
|
||||
|
||||
if (!updateResult) {
|
||||
log.warn("[TaskTracker] update task status failed, this task(instanceId={}&taskId={}) may be processed repeatedly!", instanceId, taskId);
|
||||
log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -150,18 +161,28 @@ public class TaskTracker {
|
||||
|
||||
// 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!)
|
||||
newTaskList.forEach(task -> {
|
||||
task.setJobId(jobInstanceInfo.getJobId());
|
||||
task.setInstanceId(jobInstanceInfo.getInstanceId());
|
||||
task.setJobId(instanceInfo.getJobId());
|
||||
task.setInstanceId(instanceInfo.getInstanceId());
|
||||
task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
|
||||
task.setFailedCnt(0);
|
||||
task.setLastModifiedTime(System.currentTimeMillis());
|
||||
task.setCreatedTime(System.currentTimeMillis());
|
||||
});
|
||||
|
||||
log.debug("[TaskTracker] JobInstance(id={}) add tasks: {}", jobInstanceInfo.getInstanceId(), newTaskList);
|
||||
log.debug("[TaskTracker-{}] receive new tasks: {}", instanceInfo.getInstanceId(), newTaskList);
|
||||
return taskPersistenceService.batchSave(newTaskList);
|
||||
}
|
||||
|
||||
/**
|
||||
* ProcessorTracker 上报健康状态
|
||||
*/
|
||||
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
|
||||
|
||||
ProcessorTrackerStatus processorTrackerStatus = pTAddress2Status.get(heartbeatReq.getIp());
|
||||
processorTrackerStatus.update(heartbeatReq);
|
||||
|
||||
}
|
||||
|
||||
public boolean finished() {
|
||||
return finished.get();
|
||||
}
|
||||
@ -170,7 +191,7 @@ public class TaskTracker {
|
||||
* 任务是否超时
|
||||
*/
|
||||
public boolean isTimeout() {
|
||||
return System.currentTimeMillis() - startTime > jobTimeLimitMS;
|
||||
return System.currentTimeMillis() - startTime > instanceInfo.getInstanceTimeoutMS();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -180,8 +201,8 @@ public class TaskTracker {
|
||||
|
||||
TaskDO rootTask = new TaskDO();
|
||||
rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
|
||||
rootTask.setJobId(jobInstanceInfo.getJobId());
|
||||
rootTask.setInstanceId(jobInstanceInfo.getInstanceId());
|
||||
rootTask.setJobId(instanceInfo.getJobId());
|
||||
rootTask.setInstanceId(instanceInfo.getInstanceId());
|
||||
rootTask.setTaskId(TaskConstant.ROOT_TASK_ID);
|
||||
rootTask.setFailedCnt(0);
|
||||
rootTask.setAddress(NetUtils.getLocalHost());
|
||||
@ -192,7 +213,7 @@ public class TaskTracker {
|
||||
if (!taskPersistenceService.save(rootTask)) {
|
||||
throw new RuntimeException("create root task failed.");
|
||||
}
|
||||
log.info("[TaskTracker] create root task successfully for instance(instanceId={}).", jobInstanceInfo.getInstanceId());
|
||||
log.info("[TaskTracker-{}] create root task successfully.", instanceInfo.getInstanceId());
|
||||
}
|
||||
|
||||
|
||||
@ -205,34 +226,68 @@ public class TaskTracker {
|
||||
*/
|
||||
private class DispatcherRunnable implements Runnable {
|
||||
|
||||
// 数据库查询限制,每次最多查询几个任务
|
||||
private static final int DB_QUERY_LIMIT = 100;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
taskPersistenceService.getTaskByStatus(jobInstanceInfo.getInstanceId(), TaskStatus.WAITING_DISPATCH, 100).forEach(task -> {
|
||||
try {
|
||||
// 构造 worker 执行请求
|
||||
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(jobInstanceInfo, task);
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
String instanceId = instanceInfo.getInstanceId();
|
||||
|
||||
// 构造 akka 可访问节点路径
|
||||
String targetIP = task.getAddress();
|
||||
if (StringUtils.isEmpty(targetIP)) {
|
||||
targetIP = allWorkerAddress.get(ThreadLocalRandom.current().nextInt(allWorkerAddress.size()));
|
||||
// 1. 获取可以派发任务的 ProcessorTracker
|
||||
List<String> availablePtIps = Lists.newLinkedList();
|
||||
pTAddress2Status.forEach((ip, ptStatus) -> {
|
||||
if (ptStatus.available()) {
|
||||
availablePtIps.add(ip);
|
||||
}
|
||||
});
|
||||
|
||||
// 2. 没有可用 ProcessorTracker,本次不派发
|
||||
if (availablePtIps.isEmpty()) {
|
||||
log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 避免大查询,分批派发任务
|
||||
long currentDispatchNum = 0;
|
||||
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency();
|
||||
AtomicInteger index = new AtomicInteger(0);
|
||||
|
||||
// 4. 循环查询数据库,获取需要派发的任务
|
||||
while (maxDispatchNum > currentDispatchNum) {
|
||||
|
||||
List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, DB_QUERY_LIMIT);
|
||||
currentDispatchNum += needDispatchTasks.size();
|
||||
|
||||
needDispatchTasks.forEach(task -> {
|
||||
|
||||
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task);
|
||||
|
||||
// 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address
|
||||
String ptAddress = task.getAddress();
|
||||
if (StringUtils.isEmpty(ptAddress)) {
|
||||
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
|
||||
}
|
||||
String targetPath = AkkaUtils.getAkkaRemotePath(targetIP, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||
ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath);
|
||||
|
||||
// 发送请求(Akka的tell是至少投递一次,经实验表明无法投递消息也不会报错...印度啊...)
|
||||
targetActor.tell(req, null);
|
||||
String ptActorPath = AkkaUtils.getAkkaRemotePath(ptAddress, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath);
|
||||
ptActor.tell(startTaskReq, null);
|
||||
|
||||
// 更新 ProcessorTrackerStatus 状态
|
||||
pTAddress2Status.get(ptAddress).setDispatched(true);
|
||||
// 更新数据库(如果更新数据库失败,可能导致重复执行,先不处理)
|
||||
taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, null);
|
||||
|
||||
log.debug("[TaskTracker] dispatch task({instanceId={},taskId={},taskName={}} successfully.)", task.getInstanceId(), task.getTaskId(), task.getTaskName());
|
||||
}catch (Exception e) {
|
||||
// 调度失败,不修改数据库,下次重新随机派发给 remote actor
|
||||
log.warn("[TaskTracker] dispatch task({}) failed.", task);
|
||||
log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}} successfully.)", task.getInstanceId(), task.getTaskId(), task.getTaskName());
|
||||
});
|
||||
|
||||
// 数量不足 或 查询失败,则终止循环
|
||||
if (needDispatchTasks.size() < DB_QUERY_LIMIT) {
|
||||
return;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
log.debug("[TaskTracker-{}] dispatch {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch);
|
||||
}
|
||||
}
|
||||
|
||||
@ -243,10 +298,9 @@ public class TaskTracker {
|
||||
|
||||
private static final long TIME_OUT_MS = 5000;
|
||||
|
||||
|
||||
private void innerRun() {
|
||||
|
||||
final String instanceId = jobInstanceInfo.getInstanceId();
|
||||
final String instanceId = instanceInfo.getInstanceId();
|
||||
|
||||
// 1. 查询统计信息
|
||||
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId);
|
||||
@ -261,10 +315,10 @@ public class TaskTracker {
|
||||
long finishedNum = succeedNum + failedNum;
|
||||
long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum;
|
||||
|
||||
log.debug("[TaskTracker] status check result: {}", status2Num);
|
||||
log.debug("[TaskTracker-{}] status check result: {}", instanceId, status2Num);
|
||||
|
||||
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
|
||||
req.setJobId(jobInstanceInfo.getJobId());
|
||||
req.setJobId(instanceInfo.getJobId());
|
||||
req.setInstanceId(instanceId);
|
||||
req.setTotalTaskNum(finishedNum + unfinishedNum);
|
||||
req.setSucceedTaskNum(succeedNum);
|
||||
@ -275,13 +329,13 @@ public class TaskTracker {
|
||||
if (unfinishedNum == 0) {
|
||||
|
||||
boolean finishedBoolean = true;
|
||||
ExecuteType executeType = ExecuteType.valueOf(jobInstanceInfo.getExecuteType());
|
||||
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
|
||||
|
||||
if (executeType == ExecuteType.STANDALONE) {
|
||||
|
||||
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId);
|
||||
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
|
||||
log.warn("[TaskTracker] there must have some bug in TaskTracker.");
|
||||
log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
|
||||
}else {
|
||||
resultTask = allTask.get(0);
|
||||
}
|
||||
@ -328,7 +382,7 @@ public class TaskTracker {
|
||||
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
|
||||
serverAccepted = askResponse.isSuccess();
|
||||
}catch (Exception e) {
|
||||
log.warn("[TaskTracker] report finished instance(id={}&result={}) failed.", instanceId, resultTask.getResult());
|
||||
log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, resultTask.getResult());
|
||||
}
|
||||
|
||||
// 服务器未接受上报,则等待下次重新上报
|
||||
@ -337,8 +391,8 @@ public class TaskTracker {
|
||||
}
|
||||
|
||||
// 服务器已经更新状态,任务已经执行完毕,开始释放所有资源
|
||||
log.info("[TaskTracker] instance(jobId={}&instanceId={}) process finished,result = {}, start to release resource...",
|
||||
jobInstanceInfo.getJobId(), instanceId, resultTask.getResult());
|
||||
log.info("[TaskTracker-{}] instance(jobId={}) process finished,result = {}, start to release resource...",
|
||||
instanceId, instanceInfo.getJobId(), resultTask.getResult());
|
||||
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
|
||||
stopRequest.setInstanceId(instanceId);
|
||||
allWorkerAddress.forEach(ptIP -> {
|
||||
@ -367,7 +421,7 @@ public class TaskTracker {
|
||||
long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
|
||||
if (elapsedTime > TIME_OUT_MS) {
|
||||
updateTaskStatus(instanceId, uncheckTask.getTaskId(), TaskStatus.WAITING_DISPATCH.getValue(), null, true);
|
||||
log.warn("[TaskTracker] task(instanceId={},taskId={}) try to dispatch again due to unreceived the response from processor tracker.",
|
||||
log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from processor tracker.",
|
||||
instanceId, uncheckTask.getTaskId());
|
||||
}
|
||||
|
||||
@ -383,7 +437,7 @@ public class TaskTracker {
|
||||
try {
|
||||
innerRun();
|
||||
}catch (Exception e) {
|
||||
log.warn("[TaskTracker] status checker execute failed, please fix the bug (@tjq)!", e);
|
||||
log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceInfo.getInstanceId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.github.kfcfans.oms.worker.persistence;
|
||||
|
||||
import com.github.kfcfans.common.utils.CommonUtils;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
@ -59,7 +60,7 @@ public class TaskDAOImpl implements TaskDAO {
|
||||
@Override
|
||||
public boolean batchDelete(String instanceId, List<String> taskIds) throws SQLException {
|
||||
String deleteSQL = "delete from task_info where instance_id = '%s' and task_id in %s";
|
||||
String sql = String.format(deleteSQL, instanceId, getInStringCondition(taskIds));
|
||||
String sql = String.format(deleteSQL, instanceId, CommonUtils.getInStringCondition(taskIds));
|
||||
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) {
|
||||
stat.executeUpdate(sql);
|
||||
return true;
|
||||
@ -180,12 +181,5 @@ public class TaskDAOImpl implements TaskDAO {
|
||||
ps.setLong(11, task.getLastModifiedTime());
|
||||
}
|
||||
|
||||
private static String getInStringCondition(Collection<String> collection) {
|
||||
if (CollectionUtils.isEmpty(collection)) {
|
||||
return "()";
|
||||
}
|
||||
StringBuilder sb = new StringBuilder(" ( ");
|
||||
collection.forEach(str -> sb.append("'").append(str).append("',"));
|
||||
return sb.replace(sb.length() -1, sb.length(), " ) ").toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -149,6 +149,25 @@ public class TaskPersistenceService {
|
||||
return Maps.newHashMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取等待执行的任务数量 (ProcessorTracker侧)
|
||||
* @param spInstanceId 特殊处理的 instanceId
|
||||
* @return 数量
|
||||
*/
|
||||
public Optional<Long> getWaitingToRunTaskNum(String spInstanceId) {
|
||||
try {
|
||||
return execute(() -> {
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setQueryContent("count(*) as num");
|
||||
Long num = Long.parseLong(taskDAO.simpleQueryPlus(query).get(0).get("NUM").toString());
|
||||
return Optional.of(num);
|
||||
});
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] getWaitingToRunTaskNum for instance(id={}) failed.", spInstanceId, e);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询 taskId -> taskResult,reduce阶段或postProcess 阶段使用
|
||||
*/
|
||||
@ -162,7 +181,7 @@ public class TaskPersistenceService {
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询任务状态(只查询 status,节约 I/O 资源)
|
||||
* 查询任务状态(只查询 status,节约 I/O 资源 -> 测试表明,效果惊人...磁盘I/O果然是重要瓶颈...)
|
||||
*/
|
||||
public Optional<TaskStatus> getTaskStatus(String instanceId, String taskId) {
|
||||
|
||||
@ -216,6 +235,29 @@ public class TaskPersistenceService {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量更新 Task 状态
|
||||
*/
|
||||
public boolean batchUpdateTaskStatus(String instanceId, List<String> taskIds, TaskStatus status, String result) {
|
||||
try {
|
||||
return execute(() -> {
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setQueryCondition(String.format(" task_id in %s ", CommonUtils.getInStringCondition(taskIds)));
|
||||
|
||||
TaskDO updateEntity = new TaskDO();
|
||||
updateEntity.setStatus(status.getValue());
|
||||
updateEntity.setResult(result);
|
||||
return taskDAO.simpleUpdate(query, updateEntity);
|
||||
});
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskIds={},status={},result={}.",
|
||||
instanceId, taskIds, status, result, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean updateRetryTask(String instanceId, String taskId, int failedCnt) {
|
||||
|
||||
try {
|
||||
|
@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.pojo.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 被调度执行的任务实例详情
|
||||
*
|
||||
@ -9,25 +11,40 @@ import lombok.Data;
|
||||
* @since 2020/3/16
|
||||
*/
|
||||
@Data
|
||||
public class JobInstanceInfo {
|
||||
public class InstanceInfo implements Serializable {
|
||||
|
||||
/**
|
||||
* 基础信息
|
||||
*/
|
||||
private String jobId;
|
||||
private String instanceId;
|
||||
|
||||
/**
|
||||
* 任务执行处理器信息
|
||||
*/
|
||||
// 任务执行类型,单机、广播、MR
|
||||
private String executeType;
|
||||
// 处理器类型(JavaBean、Jar、脚本等)
|
||||
private String processorType;
|
||||
// 处理器信息
|
||||
private String processorInfo;
|
||||
// 任务执行时间限制,单位毫秒
|
||||
private long timeLimit;
|
||||
// 可用处理器地址,可能多值,逗号分隔
|
||||
private String allWorkerAddress;
|
||||
|
||||
/**
|
||||
* 超时时间
|
||||
*/
|
||||
// 整个任务的总体超时时间
|
||||
private long instanceTimeoutMS;
|
||||
// Task的超时时间
|
||||
private long taskTimeoutMS;
|
||||
|
||||
/**
|
||||
* 任务运行参数
|
||||
*/
|
||||
// 任务级别的参数,相当于类的static变量
|
||||
private String jobParams;
|
||||
// 实例级别的参数,相当于类的普通变量
|
||||
private String instanceParams;
|
||||
|
||||
/* *********************** Map/MapReduce 任务专用 *********************** */
|
||||
|
||||
// 每台机器的处理线程数上限
|
||||
private int threadConcurrency;
|
@ -0,0 +1,80 @@
|
||||
package com.github.kfcfans.oms.worker.pojo.model;
|
||||
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* ProcessorTracker 的状态
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/3/27
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ProcessorTrackerStatus {
|
||||
|
||||
private static final int DISPATCH_THRESHOLD = 20;
|
||||
private static final int HEARTBEAT_TIMEOUT_MS = 60000;
|
||||
|
||||
// 冗余存储一份 IP 地址
|
||||
private String ip;
|
||||
// 上次活跃时间
|
||||
private long lastActiveTime;
|
||||
// 等待执行任务数
|
||||
private long remainTaskNum;
|
||||
// 是否被派发过任务
|
||||
private boolean dispatched;
|
||||
|
||||
public void init(String ip) {
|
||||
this.ip = ip;
|
||||
this.lastActiveTime = System.currentTimeMillis();
|
||||
this.remainTaskNum = 0;
|
||||
this.dispatched = false;
|
||||
}
|
||||
|
||||
public void update(ProcessorTrackerStatusReportReq req) {
|
||||
|
||||
// 延迟到达的请求,直接忽略
|
||||
if (req.getTime() <= lastActiveTime) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.ip = req.getIp();
|
||||
this.lastActiveTime = req.getTime();
|
||||
this.remainTaskNum = req.getRemainTaskNum();
|
||||
this.dispatched = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否可用
|
||||
*/
|
||||
public boolean available() {
|
||||
|
||||
// 未曾派发过,默认可用
|
||||
if (!dispatched) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 长时间未收到心跳消息,则不可用
|
||||
if (isTimeout()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 留有过多待处理任务,则不可用
|
||||
if (remainTaskNum >= DISPATCH_THRESHOLD) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO:后续考虑加上机器健康度等信息
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否超时(超过一定时间没有收到心跳)
|
||||
*/
|
||||
public boolean isTimeout() {
|
||||
return System.currentTimeMillis() - lastActiveTime > HEARTBEAT_TIMEOUT_MS;
|
||||
}
|
||||
}
|
@ -2,7 +2,7 @@ package com.github.kfcfans.oms.worker.pojo.request;
|
||||
|
||||
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
|
||||
import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
|
||||
import com.github.kfcfans.oms.worker.sdk.TaskContext;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
@ -34,15 +34,15 @@ public class ProcessorMapTaskRequest implements Serializable {
|
||||
private byte[] taskContent;
|
||||
}
|
||||
|
||||
public ProcessorMapTaskRequest(TaskContext taskContext, List<?> subTaskList, String taskName) {
|
||||
public ProcessorMapTaskRequest(TaskDO parentTask, List<?> subTaskList, String taskName) {
|
||||
|
||||
this.instanceId = taskContext.getInstanceId();
|
||||
this.instanceId = parentTask.getInstanceId();
|
||||
this.taskName = taskName;
|
||||
this.subTasks = Lists.newLinkedList();
|
||||
|
||||
subTaskList.forEach(subTask -> {
|
||||
// 同一个 Task 内部可能多次 Map,因此还是要确保线程级别的唯一
|
||||
String subTaskId = taskContext.getTaskId() + "." + ThreadLocalStore.TASK_ID_THREAD_LOCAL.get().getAndIncrement();
|
||||
String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.TASK_ID_THREAD_LOCAL.get().getAndIncrement();
|
||||
// 写入类名,方便反序列化
|
||||
subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask)));
|
||||
});
|
||||
|
@ -0,0 +1,41 @@
|
||||
package com.github.kfcfans.oms.worker.pojo.request;
|
||||
|
||||
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* ProcessorTracker 定时向 TaskTracker 上报健康状态
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/3/27
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ProcessorTrackerStatusReportReq {
|
||||
|
||||
private String instanceId;
|
||||
|
||||
/**
|
||||
* 请求发起时间
|
||||
*/
|
||||
private long time;
|
||||
|
||||
/**
|
||||
* 等待执行的任务数量,内存队列数 + 数据库持久数
|
||||
*/
|
||||
private long remainTaskNum;
|
||||
|
||||
/**
|
||||
* 本机地址
|
||||
*/
|
||||
private String ip;
|
||||
|
||||
public ProcessorTrackerStatusReportReq(String instanceId, long remainTaskNum) {
|
||||
this.instanceId = instanceId;
|
||||
this.remainTaskNum = remainTaskNum;
|
||||
|
||||
this.time = System.currentTimeMillis();
|
||||
this.ip = NetUtils.getLocalHost();
|
||||
}
|
||||
}
|
@ -2,7 +2,7 @@ package com.github.kfcfans.oms.worker.pojo.request;
|
||||
|
||||
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
@ -10,7 +10,7 @@ import lombok.Setter;
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* JobTracker 派发 task 进行执行
|
||||
* TaskTracker 派发 task 进行执行
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/3/17
|
||||
@ -20,52 +20,26 @@ import java.io.Serializable;
|
||||
@NoArgsConstructor
|
||||
public class TaskTrackerStartTaskReq implements Serializable {
|
||||
|
||||
private String jobId;
|
||||
private String instanceId;
|
||||
// 任务执行类型,单机、广播、MR
|
||||
private String executeType;
|
||||
// 处理器类型(JavaBean、Jar、脚本等)
|
||||
private String processorType;
|
||||
// 处理器信息
|
||||
private String processorInfo;
|
||||
// 并发计算线程数
|
||||
private int threadConcurrency;
|
||||
// TaskTracker 地址
|
||||
private String taskTrackerAddress;
|
||||
// 任务超时时间
|
||||
private long jobTimeLimitMS;
|
||||
|
||||
private String jobParams;
|
||||
private String instanceParams;
|
||||
private InstanceInfo instanceInfo;
|
||||
|
||||
private String taskId;
|
||||
private String taskName;
|
||||
private byte[] subTaskContent;
|
||||
// 子任务允许的重试次数
|
||||
private int maxRetryTimes;
|
||||
private byte[] taskContent;
|
||||
// 子任务当前重试次数
|
||||
private int currentRetryTimes;
|
||||
private int taskCurrentRetryNums;
|
||||
|
||||
|
||||
public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) {
|
||||
public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task) {
|
||||
|
||||
jobId = instanceInfo.getJobId();
|
||||
instanceId = instanceInfo.getInstanceId();
|
||||
processorType = instanceInfo.getProcessorType();
|
||||
processorInfo = instanceInfo.getProcessorInfo();
|
||||
threadConcurrency = instanceInfo.getThreadConcurrency();
|
||||
executeType = instanceInfo.getExecuteType();
|
||||
taskTrackerAddress = NetUtils.getLocalHost();
|
||||
jobTimeLimitMS = instanceInfo.getTimeLimit();
|
||||
this.taskTrackerAddress = NetUtils.getLocalHost();
|
||||
this.instanceInfo = instanceInfo;
|
||||
|
||||
jobParams = instanceInfo.getJobParams();
|
||||
instanceParams = instanceInfo.getInstanceParams();
|
||||
this.taskId = task.getTaskId();
|
||||
this.taskName = task.getTaskName();
|
||||
this.taskContent = task.getTaskContent();
|
||||
|
||||
taskId = task.getTaskId();
|
||||
taskName = task.getTaskName();
|
||||
subTaskContent = task.getTaskContent();
|
||||
|
||||
maxRetryTimes = instanceInfo.getTaskRetryNum();
|
||||
currentRetryTimes = task.getFailedCnt();
|
||||
this.taskCurrentRetryNums = task.getFailedCnt();
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,6 @@ public class TaskContext {
|
||||
|
||||
private Object subTask;
|
||||
|
||||
private String taskTrackerAddress;
|
||||
|
||||
|
||||
public String getDescription() {
|
||||
@ -40,8 +39,7 @@ public class TaskContext {
|
||||
", taskId='" + taskId + '\'' +
|
||||
", taskName='" + taskName + '\'' +
|
||||
", jobParams='" + jobParams + '\'' +
|
||||
", instanceParams='" + instanceParams + '\'' +
|
||||
", taskTrackerAddress='" + taskTrackerAddress;
|
||||
", instanceParams='" + instanceParams;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,6 +7,7 @@ import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
|
||||
import com.github.kfcfans.common.AkkaConstant;
|
||||
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
|
||||
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
import com.github.kfcfans.oms.worker.sdk.TaskContext;
|
||||
@ -48,15 +49,15 @@ public abstract class MapReduceProcessor implements BasicProcessor {
|
||||
log.warn("[MapReduceProcessor] map task size is too large, network maybe overload... please try to split the tasks.");
|
||||
}
|
||||
|
||||
TaskContext taskContext = ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.get();
|
||||
TaskDO task = ThreadLocalStore.TASK_THREAD_LOCAL.get();
|
||||
|
||||
// 1. 构造请求
|
||||
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(taskContext, taskList, taskName);
|
||||
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
|
||||
|
||||
// 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
|
||||
boolean requestSucceed = false;
|
||||
try {
|
||||
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskContext.getTaskTrackerAddress(), AkkaConstant.Task_TRACKER_ACTOR_NAME);
|
||||
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(task.getAddress(), AkkaConstant.Task_TRACKER_ACTOR_NAME);
|
||||
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
|
||||
CompletionStage<Object> requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS));
|
||||
AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||
@ -73,8 +74,8 @@ public abstract class MapReduceProcessor implements BasicProcessor {
|
||||
}
|
||||
|
||||
public boolean isRootTask() {
|
||||
TaskContext taskContext = ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.get();
|
||||
return TaskConstant.ROOT_TASK_ID.equals(taskContext.getTaskId());
|
||||
TaskDO task = ThreadLocalStore.TASK_THREAD_LOCAL.get();
|
||||
return TaskConstant.ROOT_TASK_ID.equals(task.getTaskId());
|
||||
}
|
||||
|
||||
public abstract ProcessResult reduce(TaskContext taskContext, Map<String, String> taskId2Result);
|
||||
|
@ -32,7 +32,7 @@ public class PersistenceServiceTest {
|
||||
task.setInstanceId("10086" + ThreadLocalRandom.current().nextInt(2));
|
||||
task.setTaskId(i + "");
|
||||
task.setFailedCnt(0);
|
||||
task.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
|
||||
task.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
|
||||
task.setTaskName("ROOT_TASK");
|
||||
task.setLastModifiedTime(System.currentTimeMillis());
|
||||
task.setCreatedTime(System.currentTimeMillis());
|
||||
|
@ -9,6 +9,7 @@ import com.github.kfcfans.oms.worker.common.OhMyConfig;
|
||||
import com.github.kfcfans.common.AkkaConstant;
|
||||
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
|
||||
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
@ -60,20 +61,30 @@ public class ProcessorTrackerTest {
|
||||
}
|
||||
|
||||
private static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
|
||||
|
||||
InstanceInfo instanceInfo = new InstanceInfo();
|
||||
|
||||
instanceInfo.setJobId("1");
|
||||
instanceInfo.setInstanceId("10086");
|
||||
|
||||
instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
|
||||
instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
|
||||
instanceInfo.setProcessorInfo(processor);
|
||||
|
||||
instanceInfo.setInstanceTimeoutMS(500000);
|
||||
instanceInfo.setTaskTimeoutMS(5000000);
|
||||
|
||||
instanceInfo.setThreadConcurrency(5);
|
||||
instanceInfo.setTaskRetryNum(3);
|
||||
|
||||
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
|
||||
req.setJobId("1");
|
||||
req.setInstanceId("10086");
|
||||
|
||||
req.setTaskTrackerAddress(NetUtils.getLocalHost());
|
||||
req.setInstanceInfo(instanceInfo);
|
||||
|
||||
req.setTaskId("0");
|
||||
req.setTaskName("ROOT_TASK");
|
||||
req.setMaxRetryTimes(3);
|
||||
req.setCurrentRetryTimes(0);
|
||||
|
||||
req.setExecuteType(ExecuteType.STANDALONE.name());
|
||||
req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
|
||||
req.setProcessorInfo(processor);
|
||||
req.setThreadConcurrency(5);
|
||||
req.setTaskTrackerAddress(NetUtils.getLocalHost());
|
||||
req.setJobTimeLimitMS(123132);
|
||||
req.setTaskCurrentRetryNums(0);
|
||||
|
||||
return req;
|
||||
}
|
||||
|
@ -63,7 +63,8 @@ public class TaskTrackerTest {
|
||||
req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
|
||||
req.setTaskRetryNum(3);
|
||||
req.setThreadConcurrency(20);
|
||||
req.setTimeLimit(500000);
|
||||
req.setInstanceTimeoutMS(500000);
|
||||
req.setTaskTimeoutMS(500000);
|
||||
|
||||
switch (executeType) {
|
||||
case STANDALONE:
|
||||
|
Loading…
x
Reference in New Issue
Block a user