finished the simple ProcessorTracker

This commit is contained in:
tjq 2020-03-23 13:16:08 +08:00
parent fafb708c7a
commit 2e348b411d
23 changed files with 490 additions and 144 deletions

View File

@ -0,0 +1,17 @@
package com.github.kfcfans.common;
import lombok.AllArgsConstructor;
/**
* 处理器类型
*
* @author tjq
* @since 2020/3/23
*/
@AllArgsConstructor
public enum ProcessorType {
EMBEDDED_JAVA("内置Java对象");
private String des;
}

View File

@ -20,7 +20,6 @@ public class TaskTrackerReportInstanceStatusReq {
/* ********* 统计信息 ********* */
private long totalTaskNum;
private long runningTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
}

View File

@ -0,0 +1,34 @@
package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker;
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTrackerPool;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import lombok.extern.slf4j.Slf4j;
/**
* 普通计算节点处理来自 JobTracker 的请求
*
* @author tjq
* @since 2020/3/17
*/
@Slf4j
public class ProcessorTrackerActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(TaskTrackerStartTaskReq.class, this::onReceiveTaskTrackerStartTaskReq)
.matchAny(obj -> log.warn("[ProcessorTrackerActor] receive unknown request: {}.", obj))
.build();
}
/**
* 处理来自TaskTracker的task执行请求
*/
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
String instanceId = req.getInstanceId();
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req.getThreadConcurrency()));
processorTracker.submitTask(req, getSender());
}
}

View File

@ -11,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
* @since 2020/3/17
*/
@Slf4j
public class JobTrackerActor extends AbstractActor {
public class TaskTrackerActor extends AbstractActor {
@Override
public Receive createReceive() {

View File

@ -1,16 +0,0 @@
package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
/**
* 普通计算节点处理来自 JobTracker 的请求
*
* @author tjq
* @since 2020/3/17
*/
public class WorkerActor extends AbstractActor {
@Override
public Receive createReceive() {
return null;
}
}

View File

@ -0,0 +1,17 @@
package com.github.kfcfans.oms.worker.core.classloader;
import java.net.URL;
import java.net.URLClassLoader;
/**
* 类加载器
*
* @author tjq
* @since 2020/3/23
*/
public class OhMyClassLoader extends URLClassLoader {
public OhMyClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
}
}

View File

@ -0,0 +1,69 @@
package com.github.kfcfans.oms.worker.core.classloader;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.Map;
/**
* 处理器工厂
*
* @author tjq
* @since 2020/3/23
*/
@Slf4j
public class ProcessorBeanFactory {
private final OhMyClassLoader ohMyClassLoader;
// key用来防止不同jar包同名类的冲突 -> (className -> Processor)
private final Map<String, Map<String, BasicProcessor>> cache;
private static final String LOCAL_KEY = "local";
private static volatile ProcessorBeanFactory processorBeanFactory;
public ProcessorBeanFactory() {
// 1. 初始化类加载器
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
URL path = contextClassLoader.getResource("/");
ohMyClassLoader = new OhMyClassLoader(new URL[]{path}, contextClassLoader);
// 2. 初始化对象缓存
cache = Maps.newConcurrentMap();
Map<String, BasicProcessor> className2Processor = Maps.newConcurrentMap();
cache.put(LOCAL_KEY, className2Processor);
}
public BasicProcessor getLocalProcessor(String className) {
return cache.get(LOCAL_KEY).computeIfAbsent(className, ignore -> {
try {
Class<?> clz = ohMyClassLoader.loadClass(className);
BasicProcessor processor = (BasicProcessor) clz.newInstance();
processor.init();
return processor;
}catch (Exception e) {
log.error("[ProcessorBeanFactory] load local Processor(className = {}) failed.", className, e);
}
return null;
});
}
public static ProcessorBeanFactory getInstance() {
if (processorBeanFactory != null) {
return processorBeanFactory;
}
synchronized (ProcessorBeanFactory.class) {
if (processorBeanFactory == null) {
processorBeanFactory = new ProcessorBeanFactory();
}
}
return processorBeanFactory;
}
}

View File

@ -0,0 +1,134 @@
package com.github.kfcfans.oms.worker.core.executor;
import akka.actor.ActorRef;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
/**
* Processor 执行器
*
* @author tjq
* @since 2020/3/23
*/
@Slf4j
@AllArgsConstructor
public class ProcessorRunnable implements Runnable {
private final ActorRef taskTrackerActor;
@Getter
private final TaskTrackerStartTaskReq request;
@Override
public void run() {
// 0. 创建回复
ProcessorReportTaskStatusReq reportStatus = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(request, reportStatus);
// 1. 获取 Processor
BasicProcessor processor = getProcessor();
if (processor == null) {
reportStatus.setStatus(TaskStatus.PROCESS_FAILED.getValue());
reportStatus.setResult("NO_PROCESSOR");
taskTrackerActor.tell(reportStatus, null);
return;
}
// 2. 根任务特殊处理
ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType());
if (TaskConstant.ROOT_TASK_ID.equals(request.getTaskId())) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
BeanUtils.copyProperties(request, reportStatus);
try {
ProcessResult processResult = broadcastProcessor.preProcess();
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(processResult.getMsg());
}catch (Exception e) {
log.warn("[ProcessorRunnable] broadcast task(jobId={}) preProcess failed.", request.getJobId(), e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
}
taskTrackerActor.tell(spReq, null);
}
}
// 3. 通知 TaskTracker 任务开始运行
reportStatus.setStatus(TaskStatus.PROCESSING.getValue());
taskTrackerActor.tell(reportStatus, null);
// 4. 完成提交前准备工作
ProcessResult processResult;
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(request, taskContext);
taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
// 5. 正式提交运行
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(request, reportReq);
try {
processResult = processor.process(taskContext);
reportReq.setResult(processResult.getMsg());
if (processResult.isSuccess()) {
reportReq.setStatus(TaskStatus.PROCESS_SUCCESS.getValue());
}else {
reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
}
}catch (Exception e) {
log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e);
reportReq.setResult(e.toString());
reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
}
taskTrackerActor.tell(reportReq, null);
}
private BasicProcessor getProcessor() {
BasicProcessor processor = null;
ProcessorType processorType = ProcessorType.valueOf(request.getProcessorType());
String processorInfo = request.getProcessorInfo();
switch (processorType) {
case EMBEDDED_JAVA:
// 先使用 Spring 加载
if (SpringUtils.supportSpringBean()) {
try {
processor = SpringUtils.getBean(processorInfo);
}catch (Exception e) {
log.warn("[ProcessorRunnable] no spring bean of processor(className={}).", processorInfo);
}
}
// 反射加载
if (processor == null) {
processor = ProcessorBeanFactory.getInstance().getLocalProcessor(processorInfo);
}
}
return processor;
}
}

View File

@ -0,0 +1,45 @@
package com.github.kfcfans.oms.worker.core.tracker.processor;
import akka.actor.ActorRef;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.beans.BeanUtils;
import java.util.concurrent.*;
/**
* 负责管理 Processor 的执行
*
* @author tjq
* @since 2020/3/20
*/
public class ProcessorTracker {
private ExecutorService threadPool;
public ProcessorTracker(int threadConcurrency) {
// 初始化运行用的线程池
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build();
RejectedProcessorHandler rejectHandler = new RejectedProcessorHandler(TaskPersistenceService.INSTANCE);
threadPool = new ThreadPoolExecutor(threadConcurrency, threadConcurrency, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectHandler);
}
public void submitTask(TaskTrackerStartTaskReq newTaskReq, ActorRef taskTrackerActorRef) {
// 1. 回复接受成功
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(newTaskReq, reportReq);
reportReq.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
taskTrackerActorRef.tell(reportReq, null);
// 2. 提交线程池执行
ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, newTaskReq);
threadPool.submit(processorRunnable);
}
}

View File

@ -0,0 +1,22 @@
package com.github.kfcfans.oms.worker.core.tracker.processor;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.function.Function;
/**
* 持久 Processor 对象
* instanceId -> Processor
*
* @author tjq
* @since 2020/3/20
*/
public class ProcessorTrackerPool {
private static final Map<String, ProcessorTracker> instanceId2ProcessorTracker = Maps.newConcurrentMap();
public static ProcessorTracker getProcessorTracker(String instanceId, Function<String, ProcessorTracker> creator) {
return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator);
}
}

View File

@ -0,0 +1,51 @@
package com.github.kfcfans.oms.worker.core.tracker.processor;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池拒绝策略 -> 持久化到本地H2数据库
* 出于内存占用考虑线程池的阻塞队列容量不大大量子任务涌入时需要持久化到本地数据库
* 第一版先直接写H2如果发现有性能问题再转变为 内存队列 + 批量写入 模式
*
* @author tjq
* @since 2020/3/23
*/
@Slf4j
@AllArgsConstructor
public class RejectedProcessorHandler implements RejectedExecutionHandler {
private final TaskPersistenceService taskPersistenceService;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
ProcessorRunnable processorRunnable = (ProcessorRunnable) r;
TaskTrackerStartTaskReq startTaskReq = processorRunnable.getRequest();
TaskDO newTask = new TaskDO();
BeanUtils.copyProperties(startTaskReq, newTask);
newTask.setTaskContent(startTaskReq.getSubTaskContent());
newTask.setAddress(startTaskReq.getTaskTrackerAddress());
newTask.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
newTask.setFailedCnt(0);
newTask.setCreatedTime(System.currentTimeMillis());
newTask.setLastModifiedTime(System.currentTimeMillis());
boolean save = taskPersistenceService.save(newTask);
if (save) {
log.debug("[RejectedProcessorHandler] persistent task({}) succeed.", newTask);
}else {
log.warn("[RejectedProcessorHandler] persistent task({}) failed.", newTask);
}
}
}

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.worker.tracker;
package com.github.kfcfans.oms.worker.core.tracker.task;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
@ -16,7 +16,7 @@ import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.pojo.request.WorkerReportTaskStatusReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* 负责管理 JobInstance 的运行主要包括任务的派发MR可能存在大量的任务和状态的更新
@ -75,7 +74,7 @@ public abstract class TaskTracker {
*/
public abstract void dispatch();
public void updateTaskStatus(WorkerReportTaskStatusReq req) {
public void updateTaskStatus(ProcessorReportTaskStatusReq req) {
TaskStatus taskStatus = TaskStatus.of(req.getStatus());
// 持久化失败则重试一次本地数据库操作几乎可以认为可靠......
@ -98,44 +97,18 @@ public abstract class TaskTracker {
*/
private void persistenceRootTask() {
ExecuteType executeType = ExecuteType.valueOf(jobInstanceInfo.getExecuteType());
boolean persistenceResult;
TaskDO rootTask = new TaskDO();
rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
rootTask.setJobId(jobInstanceInfo.getJobId());
rootTask.setInstanceId(jobInstanceInfo.getInstanceId());
rootTask.setTaskId(TaskConstant.ROOT_TASK_ID);
rootTask.setFailedCnt(0);
rootTask.setAddress(NetUtils.getLocalHost());
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
rootTask.setCreatedTime(System.currentTimeMillis());
rootTask.setCreatedTime(System.currentTimeMillis());
// 单机MR模型下根任务模型本机直接执行JobTracker一般为负载最小的机器且MR的根任务通常伴随着 map 操作本机执行可以有效减少网络I/O开销
if (executeType != ExecuteType.BROADCAST) {
TaskDO rootTask = new TaskDO();
rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
rootTask.setJobId(jobInstanceInfo.getJobId());
rootTask.setInstanceId(jobInstanceInfo.getInstanceId());
rootTask.setTaskId(TaskConstant.ROOT_TASK_ID);
rootTask.setFailedCnt(0);
rootTask.setAddress(NetUtils.getLocalHost());
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
rootTask.setCreatedTime(System.currentTimeMillis());
rootTask.setCreatedTime(System.currentTimeMillis());
persistenceResult = taskPersistenceService.save(rootTask);
}else {
List<TaskDO> taskList = Lists.newLinkedList();
List<String> addrList = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress());
for (int i = 0; i < addrList.size(); i++) {
TaskDO task = new TaskDO();
task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
task.setJobId(jobInstanceInfo.getJobId());
task.setInstanceId(jobInstanceInfo.getInstanceId());
task.setTaskId(String.valueOf(i));
task.setAddress(addrList.get(i));
task.setFailedCnt(0);
task.setTaskName(TaskConstant.ROOT_TASK_NAME);
task.setCreatedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
taskList.add(task);
}
persistenceResult = taskPersistenceService.batchSave(taskList);
}
if (!persistenceResult) {
if (!taskPersistenceService.save(rootTask)) {
throw new RuntimeException("create root task failed.");
}
}
@ -201,6 +174,9 @@ public abstract class TaskTracker {
log.debug("[TaskTracker] status check result({})", status2Num);
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
req.setTotalTaskNum(finishedNum + unfinishedNum);
req.setSucceedTaskNum(succeedNum);
req.setFailedTaskNum(failedNum);
// 2. 如果未完成任务数为0上报服务器
if (unfinishedNum == 0) {
@ -214,6 +190,8 @@ public abstract class TaskTracker {
// 特殊处理MapReduce任务(执行reduce)
// 特殊处理广播任务任务执行postProcess
// 通知 ProcessorTracker 释放资源PT释放资源前
}else {
}

View File

@ -26,7 +26,15 @@ public class TaskPersistenceService {
private static final int MAX_BATCH_SIZE = 50;
public boolean save(TaskDO task) {
return taskDAO.save(task);
boolean success = taskDAO.save(task);
if (!success) {
try {
Thread.sleep(100);
success = taskDAO.save(task);
}catch (Exception ignore) {
}
}
return success;
}
public boolean batchSave(List<TaskDO> tasks) {

View File

@ -0,0 +1,20 @@
package com.github.kfcfans.oms.worker.pojo.request;
import lombok.Data;
/**
* 广播任务 preExecute 结束信息
*
* @author tjq
* @since 2020/3/23
*/
@Data
public class BroadcastTaskPreExecuteFinishedReq {
private String jobId;
private String instanceId;
private String taskId;
private boolean success;
private String msg;
}

View File

@ -9,7 +9,7 @@ import lombok.Data;
* @since 2020/3/17
*/
@Data
public class WorkerReportTaskStatusReq {
public class ProcessorReportTaskStatusReq {
private String jobId;
private String instanceId;

View File

@ -18,24 +18,27 @@ public class TaskTrackerStartTaskReq {
private String jobId;
private String instanceId;
// 任务执行类型单机广播MR
private String executeType;
// 处理器类型JavaBeanJar脚本等
private String processorType;
// 处理器信息
private String processorInfo;
// 并发计算线程数
private int threadConcurrency;
// JobTracker 地址
private String jobTrackerAddress;
// TaskTracker 地址
private String taskTrackerAddress;
private String jobParams;
private String instanceParams;
private String taskId;
private String taskName;
private byte[] taskContent;
private byte[] subTaskContent;
// 子任务允许的重试次数
private int taskRetryNum;
private int maxRetryTimes;
// 子任务当前重试次数
private int currentRetryNum;
private int currentRetryTimes;
public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) {
jobId = instanceInfo.getJobId();
@ -43,15 +46,16 @@ public class TaskTrackerStartTaskReq {
processorType = instanceInfo.getProcessorType();
processorInfo = instanceInfo.getProcessorInfo();
threadConcurrency = instanceInfo.getThreadConcurrency();
jobTrackerAddress = NetUtils.getLocalHost();
executeType = instanceInfo.getExecuteType();
taskTrackerAddress = NetUtils.getLocalHost();
jobParams = instanceInfo.getJobParams();
instanceParams = instanceInfo.getInstanceParams();
taskName = task.getTaskName();
taskContent = task.getTaskContent();
subTaskContent = task.getTaskContent();
taskRetryNum = instanceInfo.getTaskRetryNum();
currentRetryNum = task.getFailedCnt();
maxRetryTimes = instanceInfo.getTaskRetryNum();
currentRetryTimes = task.getFailedCnt();
}
}

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.oms.worker.sdk;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
/**
* 任务上下文
@ -12,7 +14,8 @@ import lombok.Data;
* @author tjq
* @since 2020/3/18
*/
@Data
@Getter
@Setter
public class TaskContext {
private String jobId;
@ -29,4 +32,15 @@ public class TaskContext {
private Object subTask;
private String taskTrackerAddress;
public String getDescription() {
return "jobId='" + jobId + '\'' +
", instanceId='" + instanceId + '\'' +
", taskId='" + taskId + '\'' +
", taskName='" + taskName + '\'' +
", jobParams='" + jobParams + '\'' +
", instanceParams='" + instanceParams + '\'' +
", taskTrackerAddress='" + taskTrackerAddress;
}
}

View File

@ -5,12 +5,25 @@ import com.github.kfcfans.oms.worker.sdk.ProcessResult;
/**
* 基础的处理器适用于单机执行
* TODO真实API不包含异常抛出为了便于开发先加上
*
* @author tjq
* @since 2020/3/18
*/
public interface BasicProcessor {
ProcessResult process(TaskContext context);
ProcessResult process(TaskContext context) throws Exception;
/**
* Processor 初始化方法
*/
default void init() throws Exception {
}
/**
* Processor 销毁方法
*/
default void destroy() throws Exception {
}
}

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.oms.worker.sdk.ProcessResult;
/**
* 广播执行处理器适用于广播执行
* TODO真实API不包含异常抛出为了便于开发先加上
*
* @author tjq
* @since 2020/3/18
@ -13,9 +14,9 @@ public interface BroadcastProcessor extends BasicProcessor {
/**
* 在所有节点广播执行前执行只会在一台机器执行一次
*/
ProcessResult preProcess();
ProcessResult preProcess() throws Exception;
/**
* 在所有节点广播执行完成后执行只会在一台机器执行一次
*/
ProcessResult postProcess();
ProcessResult postProcess() throws Exception;
}

View File

@ -5,6 +5,7 @@ import akka.pattern.Patterns;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.pojo.request.WorkerMapTaskRequest;
import com.github.kfcfans.oms.worker.pojo.response.MapTaskResponse;
@ -71,5 +72,10 @@ public abstract class MapReduceProcessor implements BasicProcessor {
}
}
public boolean isRootTask() {
TaskContext taskContext = ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.get();
return TaskConstant.ROOT_TASK_ID.equals(taskContext.getTaskId());
}
public abstract ProcessResult reduce(TaskContext taskContext, Map<String, String> taskId2Result);
}

View File

@ -1,23 +0,0 @@
package com.github.kfcfans.oms.worker.tracker;
import akka.actor.ActorRef;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
/**
* 广播任务使用的 TaskTracker
*
* @author tjq
* @since 2020/3/17
*/
public class BroadcastTaskTracker extends TaskTracker {
public BroadcastTaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) {
super(jobInstanceInfo, taskTrackerActorRef);
}
@Override
public void dispatch() {
}
}

View File

@ -1,24 +0,0 @@
package com.github.kfcfans.oms.worker.tracker;
import akka.actor.ActorRef;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.WorkerMapTaskRequest;
/**
* MapReduce 任务使用的 TaskTracker
*
* @author tjq
* @since 2020/3/17
*/
public class MapReduceTaskTracker extends StandaloneTaskTracker {
public MapReduceTaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) {
super(jobInstanceInfo, taskTrackerActorRef);
}
public void newTask(WorkerMapTaskRequest mapRequest) {
}
}

View File

@ -1,23 +0,0 @@
package com.github.kfcfans.oms.worker.tracker;
import akka.actor.ActorRef;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
/**
* 单机任务使用的 TaskTracker
*
* @author tjq
* @since 2020/3/17
*/
public class StandaloneTaskTracker extends TaskTracker {
public StandaloneTaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) {
super(jobInstanceInfo, taskTrackerActorRef);
}
@Override
public void dispatch() {
}
}