design the cluster ha solution, but it's hard to impl...

This commit is contained in:
tjq 2020-03-25 18:12:16 +08:00
parent c83eda37f2
commit 5e4463061e
25 changed files with 591 additions and 153 deletions

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.worker.common.constants;
package com.github.kfcfans.common;
/**
* akka actor 名称
@ -23,4 +23,10 @@ public class AkkaConstant {
public static final String AKKA_CONFIG_NAME = "oms-akka-application.conf";
/* ************************ SERVER ************************ */
public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server";
public static final String SERVER_ACTOR_NAME = "server_actor";
}

View File

@ -0,0 +1,29 @@
package com.github.kfcfans.common.model;
import lombok.Data;
/**
* 系统指标
*
* @author tjq
* @since 2020/3/25
*/
@Data
public class SystemMetrics {
// CPU核心数量
private int cpuProcessors;
// CPU负载
private double cpuLoad;
// 内存单位 GB
private double jvmUsedMemory;
private double jvmTotalMemory;
private double jvmMaxMemory;
// 磁盘单位 GB
private double diskUsed;
private double diskTotal;
private double diskUsage;
}

View File

@ -0,0 +1,19 @@
package com.github.kfcfans.common.request;
import com.github.kfcfans.common.model.SystemMetrics;
import lombok.Data;
/**
* Worker 上报健康信息worker定时发送的heartbeat
*
* @author tjq
* @since 2020/3/25
*/
@Data
public class WorkerHealthReportReq {
// 本机地址 -> IP:port
private String totalAddress;
private SystemMetrics systemMetrics;
}

View File

@ -1,11 +1,11 @@
package com.github.kfcfans.oms.worker.pojo.response;
package com.github.kfcfans.common.response;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* WorkerMapTaskRequest 的响应
* Pattens.ask 的响应
*
* @author tjq
* @since 2020/3/18
@ -13,6 +13,6 @@ import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MapTaskResponse {
public class AskResponse {
private boolean success;
}

View File

@ -1,21 +1,21 @@
package com.github.kfcfans.oms.worker;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
@ -24,6 +24,10 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* 客户端启动类
@ -36,9 +40,13 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
@Getter
private static OhMyConfig config;
public static ActorSystem actorSystem;
@Getter
private static String currentServer;
@Getter
private static String workerAddress;
public static ActorRef processorTracker;
public static ActorSystem actorSystem;
private static ScheduledExecutorService timingPool;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
@ -63,20 +71,26 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
int port = config.getListeningPort() == null ? AkkaConstant.DEFAULT_PORT : config.getListeningPort();
overrideConfig.put("akka.remote.artery.canonical.hostname", localIP);
overrideConfig.put("akka.remote.artery.canonical.port", port);
log.info("[OhMyWorker] akka-remote listening address: {}:{}", localIP, port);
workerAddress = localIP + ":" + port;
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(AkkaConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(TaskTrackerActor.class), AkkaConstant.Task_TRACKER_ACTOR_NAME);
processorTracker = actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
// 初始化存储
TaskPersistenceService.INSTANCE.init();
log.info("[OhMyWorker] local storage initialized successfully.");
// 初始化定时任务
ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build();
timingPool = Executors.newScheduledThreadPool(2, timingPoolFactory);
timingPool.scheduleAtFixedRate(new WorkerHealthReportRunnable(), 0, 30, TimeUnit.SECONDS);
log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch);
}catch (Exception e) {

View File

@ -10,7 +10,7 @@ import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.oms.worker.pojo.response.MapTaskResponse;
import com.github.kfcfans.common.response.AskResponse;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
@ -45,7 +45,7 @@ public class TaskTrackerActor extends AbstractActor {
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
} else {
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult());
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult(), false);
}
}
@ -77,7 +77,7 @@ public class TaskTrackerActor extends AbstractActor {
log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e);
}
MapTaskResponse response = new MapTaskResponse(success);
AskResponse response = new AskResponse(success);
getSender().tell(response, getSelf());
}
@ -92,30 +92,28 @@ public class TaskTrackerActor extends AbstractActor {
return;
}
log.info("[TaskTrackerActor] Instance(id={}) pre process finished.", req.getInstanceId());
log.info("[TaskTrackerActor] instance(id={}) pre process finished.", req.getInstanceId());
// 1. 更新根任务状态广播任务的根任务为 preProcess 任务
// 1. 生成集群子任务
boolean success = req.isSuccess();
int status = success ? TaskStatus.WORKER_PROCESS_SUCCESS.getValue() : TaskStatus.WORKER_PROCESS_FAILED.getValue();
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status, req.getMsg());
if (success) {
List<String> allWorkerAddress = taskTracker.getAllWorkerAddress();
List<TaskDO> subTaskList = Lists.newLinkedList();
for (int i = 0; i < allWorkerAddress.size(); i++) {
TaskDO subTask = new TaskDO();
subTask.setTaskName(TaskConstant.BROADCAST_TASK_NAME);
subTask.setTaskId(TaskConstant.ROOT_TASK_ID + "." + i);
// 2. 前置任务执行失败则广播任务失败
if (!req.isSuccess()) {
subTaskList.add(subTask);
}
taskTracker.addTask(subTaskList);
}else {
log.info("[TaskTrackerActor] BroadcastTask(instanceId={}) failed because of preProcess failed.", req.getInstanceId());
return;
}
// 3. 前置任务执行成功准备所有集群广播执行产生其他集群的子任务
List<String> allWorkerAddress = taskTracker.getAllWorkerAddress();
List<TaskDO> subTaskList = Lists.newLinkedList();
for (int i = 0; i < allWorkerAddress.size(); i++) {
TaskDO subTask = new TaskDO();
subTask.setTaskName(TaskConstant.BROADCAST_TASK_NAME);
subTask.setTaskId(TaskConstant.ROOT_TASK_ID + "." + i);
subTaskList.add(subTask);
}
taskTracker.addTask(subTaskList);
// 2. 更新根任务状态广播任务的根任务为 preProcess 任务
int status = success ? TaskStatus.WORKER_PROCESS_SUCCESS.getValue() : TaskStatus.WORKER_PROCESS_FAILED.getValue();
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status, req.getMsg(), false);
}
/**

View File

@ -0,0 +1,43 @@
package com.github.kfcfans.oms.worker.background;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* 选择服务器
*
* @author tjq
* @since 2020/3/25
*/
public class ServerSelectionService {
private static String appName;
private static List<String> allServerAddress;
private static String defaultServerAddress;
/**
* 获取默认服务器同一个 appName 一定对应同一台服务器
*/
private static String getDefaultServer() {
if (!StringUtils.isEmpty(defaultServerAddress)) {
return defaultServerAddress;
}
Long index = letter2Num(appName);
defaultServerAddress = allServerAddress.get(index.intValue() % allServerAddress.size());
return defaultServerAddress;
}
private static Long letter2Num(String str) {
if (StringUtils.isEmpty(str)) {
return 0L;
}
AtomicLong res = new AtomicLong(0);
str.chars().forEach(res::addAndGet);
return res.get();
}
}

View File

@ -0,0 +1,37 @@
package com.github.kfcfans.oms.worker.background;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.model.SystemMetrics;
import com.github.kfcfans.common.request.WorkerHealthReportReq;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Worker健康度定时上报
*
* @author tjq
* @since 2020/3/25
*/
@Slf4j
@AllArgsConstructor
public class WorkerHealthReportRunnable implements Runnable {
@Override
public void run() {
SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics();
WorkerHealthReportReq reportReq = new WorkerHealthReportReq();
reportReq.setSystemMetrics(systemMetrics);
reportReq.setTotalAddress(OhMyWorker.getWorkerAddress());
// 发送请求
String serverPath = AkkaUtils.getAkkaServerNodePath(AkkaConstant.SERVER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath);
actorSelection.tell(reportReq, null);
}
}

View File

@ -8,8 +8,24 @@ package com.github.kfcfans.oms.worker.common.constants;
*/
public class TaskConstant {
/**
* 所有根任务的名称
*/
public static final String ROOT_TASK_NAME = "OMS_ROOT_TASK";
public static final String BROADCAST_TASK_NAME = "OMS_BROADCAST_TASK";
/**
* 所有根任务的ID
*/
public static final String ROOT_TASK_ID = "0";
/**
* 广播执行任务的名称
*/
public static final String BROADCAST_TASK_NAME = "OMS_BROADCAST_TASK";
/**
* 终极任务的名称MapReduce的reduceTask和Broadcast的postProcess会有该任务
*/
public static final String LAST_TASK_NAME = "OMS_LAST_TASK";
// 除0外任何数都可以
public static final String LAST_TASK_ID = "9999";
}

View File

@ -17,14 +17,14 @@ public enum TaskStatus {
WAITING_DISPATCH(1, "等待调度器调度"),
DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功但不保证worker收到"),
WORKER_PROCESSING(3, "worker开始执行"),
WORKER_PROCESS_SUCCESS(4, "worker执行成功"),
WORKER_PROCESS_FAILED(5, "worker执行失败"),
WORKER_PROCESS_FAILED(4, "worker执行失败"),
WORKER_PROCESS_SUCCESS(5, "worker执行成功"),
/* ******************* Worker 专用 ******************* */
RECEIVE_SUCCESS(11, "成功接受任务但未开始执行此时worker满载暂时无法运行"),
PROCESSING(12, "执行中"),
PROCESS_SUCCESS(13, "执行成功"),
PROCESS_FAILED(14, "执行失败");
PROCESS_FAILED(13, "执行失败"),
PROCESS_SUCCESS(14, "执行成功");
private int value;
private String des;
@ -34,13 +34,13 @@ public enum TaskStatus {
case 1: return WAITING_DISPATCH;
case 2: return DISPATCH_SUCCESS_WORKER_UNCHECK;
case 3: return WORKER_PROCESSING;
case 4: return WORKER_PROCESS_SUCCESS;
case 5: return WORKER_PROCESS_FAILED;
case 4: return WORKER_PROCESS_FAILED;
case 5: return WORKER_PROCESS_SUCCESS;
case 11: return RECEIVE_SUCCESS;
case 12: return PROCESSING;
case 13: return PROCESS_SUCCESS;
case 14: return PROCESS_FAILED;
case 13: return PROCESS_FAILED;
case 14: return PROCESS_SUCCESS;
}
throw new IllegalArgumentException("no TaskStatus match the value of " + v);
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms.worker.common.utils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.common.AkkaConstant;
/**
* AKKA 工具类
@ -16,10 +16,16 @@ public class AkkaUtils {
*/
private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/user/%s";
private static final String AKKA_SERVER_NODE_PATH = "akka://%s@%s/user/%s";
public static String getAkkaRemotePath(String ip, String actorName) {
Integer configPort = OhMyWorker.getConfig().getListeningPort();
int port = configPort == null ? AkkaConstant.DEFAULT_PORT : configPort;
return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, port, actorName);
}
public static String getAkkaServerNodePath(String actorName) {
return String.format(AKKA_SERVER_NODE_PATH, AkkaConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName);
}
}

View File

@ -0,0 +1,55 @@
package com.github.kfcfans.oms.worker.common.utils;
import com.github.kfcfans.common.model.SystemMetrics;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
/**
* 系统信息工具用于采集监控指标
*
* @author tjq
* @since 2020/3/25
*/
public class SystemInfoUtils {
// JMX bean can be accessed externally and is meant for management tools like hyperic ( or even nagios ) - It would delegate to Runtime anyway.
private static final Runtime runtime = Runtime.getRuntime();
private static OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
public static SystemMetrics getSystemMetrics() {
SystemMetrics metrics = new SystemMetrics();
// CPU 信息
metrics.setCpuProcessors(osMXBean.getAvailableProcessors());
metrics.setCpuLoad(osMXBean.getSystemLoadAverage());
// JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存-Xmx参数设置的值totalMemory指JVM当前持久的总内存)
metrics.setJvmMaxMemory(bytes2GB(runtime.maxMemory()));
metrics.setJvmTotalMemory(bytes2GB(runtime.totalMemory()));
metrics.setJvmUsedMemory(metrics.getJvmTotalMemory() - bytes2GB(runtime.freeMemory()));
// 磁盘信息
long free = 0;
long total = 0;
File[] roots = File.listRoots();
for (File file : roots) {
free += file.getFreeSpace();
total += file.getTotalSpace();
}
metrics.setDiskUsed(bytes2GB(total - free));
metrics.setDiskTotal(bytes2GB(total));
metrics.setDiskUsage(metrics.getDiskUsed() / metrics.getDiskTotal());
return metrics;
}
private static double bytes2GB(long bytes) {
return bytes / 1024.0 / 1024 / 1024;
}
}

View File

@ -1,6 +1,5 @@
package com.github.kfcfans.oms.worker.core.executor;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.ExecuteType;
@ -10,6 +9,7 @@ import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
@ -17,11 +17,15 @@ import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor;
import com.google.common.base.Stopwatch;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.Map;
/**
* Processor 执行器
*
@ -40,38 +44,47 @@ public class ProcessorRunnable implements Runnable {
@Override
public void run() {
log.debug("[ProcessorRunnable] start to run task(instanceId={}&taskId={}&taskName={})", request.getInstanceId(), request.getTaskId(), request.getTaskName());
String taskId = request.getTaskId();
String instanceId = request.getInstanceId();
log.debug("[ProcessorRunnable] start to run task(instanceId={}&taskId={}&taskName={})", instanceId, taskId, request.getTaskName());
try {
// 0. 创建回复
ProcessorReportTaskStatusReq reportStatus = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(request, reportStatus);
// 0. 完成执行上下文准备 & 上报执行信息
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(request, taskContext);
if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) {
taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
}
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
reportStatus(TaskStatus.WORKER_PROCESSING, null);
// 1. 获取 Processor
BasicProcessor processor = getProcessor();
if (processor == null) {
reportStatus.setStatus(TaskStatus.PROCESS_FAILED.getValue());
reportStatus.setResult("NO_PROCESSOR");
taskTrackerActor.tell(reportStatus, null);
reportStatus(TaskStatus.PROCESS_FAILED, "NO_PROCESSOR");
return;
}
// 2. 根任务特殊处理
ExecuteType executeType = ExecuteType.valueOf(request.getExecuteType());
if (TaskConstant.ROOT_TASK_ID.equals(request.getTaskId())) {
if (TaskConstant.ROOT_TASK_ID.equals(taskId)) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
BeanUtils.copyProperties(request, reportStatus);
spReq.setTaskId(taskId);
spReq.setInstanceId(instanceId);
try {
ProcessResult processResult = broadcastProcessor.preProcess();
ProcessResult processResult = broadcastProcessor.preProcess(taskContext);
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(processResult.getMsg());
}catch (Exception e) {
log.warn("[ProcessorRunnable] broadcast task(jobId={}) preProcess failed.", request.getJobId(), e);
log.warn("[ProcessorRunnable] broadcast task(instanceId={}) preProcess failed.", instanceId, e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
}
@ -83,38 +96,51 @@ public class ProcessorRunnable implements Runnable {
}
}
// 3. 通知 TaskTracker 任务开始运行
reportStatus.setStatus(TaskStatus.PROCESSING.getValue());
taskTrackerActor.tell(reportStatus, null);
// 3. 最终任务特殊处理一定和 TaskTracker 处于相同的机器
if (TaskConstant.LAST_TASK_ID.equals(taskId)) {
// 4. 完成提交前准备工作
ProcessResult processResult;
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(request, taskContext);
if (request.getSubTaskContent() != null && request.getSubTaskContent().length > 0) {
taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[ProcessorRunnable] instance(instanceId={})' last task(taskId={}) start to process.", instanceId, taskId);
ProcessResult lastResult;
Map<String, String> taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId);
try {
switch (executeType) {
case BROADCAST:
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
lastResult = broadcastProcessor.postProcess(taskContext, taskId2ResultMap);
break;
case MAP_REDUCE:
MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
lastResult = mapReduceProcessor.reduce(taskContext, taskId2ResultMap);
break;
default:
lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
}
}catch (Exception e) {
lastResult = new ProcessResult(false, e.toString());
log.warn("[ProcessorRunnable] execute last task(instanceId={},taskId={}) failed.", instanceId, taskId, e);
}
TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED;
reportStatus(status, lastResult.getMsg());
log.info("[ProcessorRunnable] instance(instanceId={},taskId={})' last task execute successfully, using time: {}", instanceId, taskId, stopwatch);
return;
}
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
// 5. 正式提交运行
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
BeanUtils.copyProperties(request, reportReq);
// 4. 正式提交运行
ProcessResult processResult;
try {
processResult = processor.process(taskContext);
reportReq.setResult(processResult.getMsg());
if (processResult.isSuccess()) {
reportReq.setStatus(TaskStatus.PROCESS_SUCCESS.getValue());
}else {
reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
}
}catch (Exception e) {
log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e);
reportReq.setResult(e.toString());
reportReq.setStatus(TaskStatus.PROCESS_FAILED.getValue());
processResult = new ProcessResult(false, e.toString());
}
taskTrackerActor.tell(reportReq, null);
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED, processResult.getMsg());
}catch (Exception e) {
log.error("[ProcessorRunnable] execute failed, please fix this bug!", e);
}
@ -144,4 +170,18 @@ public class ProcessorRunnable implements Runnable {
return processor;
}
/**
* 上报状态给 TaskTracker
*/
private void reportStatus(TaskStatus status, String result) {
ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq();
req.setInstanceId(request.getInstanceId());
req.setTaskId(request.getTaskId());
req.setStatus(status.getValue());
req.setResult(result);
taskTrackerActor.tell(req, null);
}
}

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.oms.worker.core.tracker.processor;
import akka.actor.ActorSelection;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
@ -157,7 +157,7 @@ public class ProcessorTracker {
}
TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE;
List<TaskDO> taskDOList = taskPersistenceService.getNeedRunTask(instanceId, MAX_QUEUE_SIZE / 2);
List<TaskDO> taskDOList =taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.RECEIVE_SUCCESS, MAX_QUEUE_SIZE / 2);
if (CollectionUtils.isEmpty(taskDOList)) {
return;

View File

@ -2,10 +2,15 @@ package com.github.kfcfans.oms.worker.core.tracker.task;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import ch.qos.logback.core.util.SystemInfo;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.JobInstanceStatus;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.constants.CommonSJ;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
@ -15,7 +20,8 @@ import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -23,6 +29,7 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ -79,11 +86,23 @@ public abstract class TaskTracker {
/**
* 更新任务状态
* 任务状态机只允许数字递增
*/
public void updateTaskStatus(String instanceId, String taskId, int status,@Nullable String result) {
public void updateTaskStatus(String instanceId, String taskId, int status, @Nullable String result, boolean force) {
// 1. 读取当前Task状态防止过期消息重置任务状态
if (!force) {
TaskDO originTask = taskPersistenceService.selectTaskByKey(instanceId, taskId);
if (originTask.getStatus() > status) {
log.warn("[TaskTracker] task(instanceId={},taskId={},dbStatus={},requestStatus={}) status conflict, this request will be drop.",
instanceId, taskId, originTask.getStatus(), status);
return;
}
}
TaskStatus taskStatus = TaskStatus.of(status);
// 持久化失败则重试一次本地数据库操作几乎可以认为可靠......
// 2. 更新数据库状态
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, taskStatus, result);
if (!updateResult) {
try {
@ -165,7 +184,8 @@ public abstract class TaskTracker {
@Override
public void run() {
taskPersistenceService.getNeedDispatchTask(jobInstanceInfo.getInstanceId()).forEach(task -> {
taskPersistenceService.getTaskByStatus(jobInstanceInfo.getInstanceId(), TaskStatus.WAITING_DISPATCH, 100).forEach(task -> {
try {
// 构造 worker 执行请求
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(jobInstanceInfo, task);
@ -196,11 +216,15 @@ public abstract class TaskTracker {
*/
private class StatusCheckRunnable implements Runnable {
private static final long TIME_OUT_MS = 5000;
@Override
public void run() {
final String instanceId = jobInstanceInfo.getInstanceId();
// 1. 查询统计信息
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(jobInstanceInfo.getInstanceId());
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId);
long waitingDispatchNum = status2Num.get(TaskStatus.WAITING_DISPATCH);
long workerUnreceivedNum = status2Num.get(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK);
@ -214,28 +238,115 @@ public abstract class TaskTracker {
log.debug("[TaskTracker] status check result({})", status2Num);
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
req.setJobId(jobInstanceInfo.getJobId());
req.setInstanceId(instanceId);
req.setTotalTaskNum(finishedNum + unfinishedNum);
req.setSucceedTaskNum(succeedNum);
req.setFailedTaskNum(failedNum);
// 2. 如果未完成任务数为0上报服务器
// 2. 如果未完成任务数为0判断是否真正结束并获取真正结束任务的执行结果
TaskDO resultTask = null;
if (unfinishedNum == 0) {
finished.set(true);
if (failedNum == 0) {
req.setInstanceStatus(JobInstanceStatus.SUCCEED.getValue());
}else {
req.setInstanceStatus(JobInstanceStatus.FAILED.getValue());
boolean finishedBoolean = true;
ExecuteType executeType = ExecuteType.valueOf(jobInstanceInfo.getExecuteType());
if (executeType == ExecuteType.STANDALONE) {
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId);
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
log.warn("[TaskTracker] there must have some bug in TaskTracker.");
}else {
resultTask = allTask.get(0);
}
} else {
resultTask = taskPersistenceService.getLastTask(instanceId);
// 不存在代表前置任务刚刚执行完毕需要创建 lastTask
if (resultTask == null) {
finishedBoolean = false;
TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
newLastTask.setTaskId(TaskConstant.LAST_TASK_ID);
newLastTask.setAddress(NetUtils.getLocalHost());
addTask(Lists.newArrayList(newLastTask));
}else {
TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
finishedBoolean = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED;
}
}
// 特殊处理MapReduce任务(执行reduce)
// 特殊处理广播任务任务执行postProcess
// 通知 ProcessorTracker 释放资源PT释放资源前
}else {
finished.set(finishedBoolean);
}
String serverPath = AkkaUtils.getAkkaServerNodePath(AkkaConstant.SERVER_ACTOR_NAME);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
// 3. 执行成功报告服务器
if (finished.get() && resultTask != null) {
boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
req.setResult(resultTask.getResult());
req.setInstanceStatus(success ? JobInstanceStatus.SUCCEED.getValue() : JobInstanceStatus.FAILED.getValue());
CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS));
boolean serverAccepted = false;
try {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
serverAccepted = askResponse.isSuccess();
}catch (Exception e) {
log.warn("[TaskTracker] report finished instance(id={}&result={}) failed.", instanceId, resultTask.getResult());
}
// 服务器未接受上报则等待下次重新上报
if (!serverAccepted) {
return;
}
// 服务器已经更新状态任务已经执行完毕开始释放所有资源
log.info("[TaskTracker] instance(jobId={}&instanceId={}) process finished,result = {}, start to release resource...",
jobInstanceInfo.getJobId(), instanceId, resultTask.getResult());
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
allWorkerAddress.forEach(ptIP -> {
String ptPath = AkkaUtils.getAkkaRemotePath(ptIP, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath);
// 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
ptActor.tell(stopRequest, null);
});
// 销毁TaskTracker
TaskTrackerPool.remove(instanceId);
destroy();
return;
}
// 4. 未完成上报状态
req.setInstanceStatus(JobInstanceStatus.RUNNING.getValue());
serverActor.tell(req, null);
// 5.1 超时检查 -> 派发未接受的任务
long currentMS = System.currentTimeMillis();
if (workerUnreceivedNum != 0) {
taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
if (elapsedTime > TIME_OUT_MS) {
updateTaskStatus(instanceId, uncheckTask.getTaskId(), TaskStatus.WAITING_DISPATCH.getValue(), null, true);
log.warn("[TaskTracker] task(instanceId={},taskId={}) try to dispatch again due to unreceived the response from processor tracker.",
instanceId, uncheckTask.getTaskId());
}
});
}
// 5.2 超时检查 -> 等待执行/执行中的任务要不要采取 Worker不挂不行动准则Worker挂了再重新派发任务
}
}
}

View File

@ -21,4 +21,8 @@ public class TaskTrackerPool {
return instanceId2TaskTracker.get(instanceId);
}
public static void remove(String instanceId) {
instanceId2TaskTracker.remove(instanceId);
}
}

View File

@ -17,7 +17,7 @@ import java.sql.SQLException;
*/
public class ConnectionFactory {
private static DataSource dataSource;
private static volatile DataSource dataSource;
public static Connection getConnection() throws SQLException {
return getDataSource().getConnection();

View File

@ -30,12 +30,15 @@ public interface TaskDAO {
int batchDelete(String instanceId, List<String> taskIds);
TaskDO selectByKey(String instanceId, String taskId);
List<TaskDO> simpleQuery(SimpleTaskQuery query);
List<Map<String, Object>> simpleQueryPlus(SimpleTaskQuery query);
boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField);
/**
* 查询 taskId -> taskResult (为了性能特殊定制主要是内存占用如果使用 simpleQueryPlus内存中需要同时存在3份数据 是同时存在3份数据吗)
*/
Map<String, String> queryTaskId2TaskResult(String instanceId);
}

View File

@ -83,30 +83,6 @@ public class TaskDAOImpl implements TaskDAO {
return 0;
}
@Override
public TaskDO selectByKey(String instanceId, String taskId) {
String selectSQL = "select * from task_info where instance_id = ? and task_id = ?";
ResultSet rs = null;
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(selectSQL)) {
ps.setString(1, instanceId);
ps.setString(2, taskId);
rs = ps.executeQuery();
if (rs.next()) {
return convert(rs);
}
}catch (Exception e) {
log.error("[TaskDAO] selectByKey failed(instanceId = {}, taskId = {}).", instanceId, taskId, e);
}finally {
if (rs != null) {
try {
rs.close();
}catch (Exception ignore) {
}
}
}
return null;
}
@Override
public List<TaskDO> simpleQuery(SimpleTaskQuery query) {
ResultSet rs = null;
@ -175,6 +151,30 @@ public class TaskDAOImpl implements TaskDAO {
}
}
@Override
public Map<String, String> queryTaskId2TaskResult(String instanceId) {
ResultSet rs = null;
Map<String, String> taskId2Result = Maps.newLinkedHashMapWithExpectedSize(4096);
String sql = "select task_id, result from task_info where instance_id = ?";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, instanceId);
rs = ps.executeQuery();
while (rs.next()) {
taskId2Result.put(rs.getString("task_id"), rs.getString("result"));
}
}catch (Exception e) {
log.error("[TaskDAO] queryTaskId2TaskResult failed(sql = {}).", sql, e);
}finally {
if (rs != null) {
try {
rs.close();
}catch (Exception ignore) {
}
}
}
return taskId2Result;
}
private static TaskDO convert(ResultSet rs) throws SQLException {
TaskDO task = new TaskDO();
task.setTaskId(rs.getString("task_id"));

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.worker.persistence;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.google.common.collect.Maps;
import org.springframework.util.CollectionUtils;
@ -51,29 +52,36 @@ public class TaskPersistenceService {
return taskDAO.batchSave(tasks);
}
/**
* 获取 TaskTracker 准备派发给 Worker 执行的 task
* 获取 MapReduce Broadcast 的最后一个任务
*/
public List<TaskDO> getNeedDispatchTask(String instanceId) {
public TaskDO getLastTask(String instanceId) {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setTaskName(TaskConstant.LAST_TASK_NAME);
List<TaskDO> taskDOS = taskDAO.simpleQuery(query);
if (CollectionUtils.isEmpty(taskDOS)) {
return null;
}
return taskDOS.get(0);
}
public List<TaskDO> getAllTask(String instanceId) {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
query.setLimit(100);
return taskDAO.simpleQuery(query);
}
/**
* 更新 Task 的状态
* 获取指定状态的Task
*/
public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status, String result) {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setTaskId(taskId);
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(status.getValue());
updateEntity.setResult(result);
return taskDAO.simpleUpdate(condition, updateEntity);
public List<TaskDO> getTaskByStatus(String instanceId, TaskStatus status, int limit) {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setStatus(status.getValue());
query.setLimit(limit);
return taskDAO.simpleQuery(query);
}
/**
@ -97,18 +105,40 @@ public class TaskPersistenceService {
}
/**
* 获取需要被执行的任务
* 查询 taskId -> taskResultreduce阶段或postProcess 阶段使用
*/
public List<TaskDO> getNeedRunTask(String instanceId, int limit) {
public Map<String, String> getTaskId2ResultMap(String instanceId) {
return taskDAO.queryTaskId2TaskResult(instanceId);
}
/**
* 根据主键查找任务
*/
public TaskDO selectTaskByKey(String instanceId, String taskId) {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
query.setLimit(limit);
return taskDAO.simpleQuery(query);
query.setTaskId(taskId);
List<TaskDO> results = taskDAO.simpleQuery(query);
if (CollectionUtils.isEmpty(results)) {
return null;
}
return results.get(0);
}
/**
* 更新 Task 的状态
*/
public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status, String result) {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setTaskId(taskId);
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(status.getValue());
updateEntity.setResult(result);
return taskDAO.simpleUpdate(condition, updateEntity);
}
public int batchDelete(String instanceId, List<String> taskIds) {
return taskDAO.batchDelete(instanceId, taskIds);
}

View File

@ -0,0 +1,18 @@
package com.github.kfcfans.oms.worker.pojo.request;
import lombok.Data;
/**
* TaskTracker 停止 ProcessorTracker释放相关资源
* 任务执行完毕后停止 OR 手动强制停止
*
* @author tjq
* @since 2020/3/25
*/
@Data
public class TaskTrackerStopInstanceReq {
private String instanceId;
// 保留字段暂时没用
private String type;
}

View File

@ -1,6 +1,9 @@
package com.github.kfcfans.oms.worker.sdk.api;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import java.util.Map;
/**
* 广播执行处理器适用于广播执行
@ -14,9 +17,9 @@ public interface BroadcastProcessor extends BasicProcessor {
/**
* 在所有节点广播执行前执行只会在一台机器执行一次
*/
ProcessResult preProcess() throws Exception;
ProcessResult preProcess(TaskContext taskContext) throws Exception;
/**
* 在所有节点广播执行完成后执行只会在一台机器执行一次
*/
ProcessResult postProcess() throws Exception;
ProcessResult postProcess(TaskContext taskContext, Map<String, String> taskId2Result) throws Exception;
}

View File

@ -4,11 +4,11 @@ import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest;
import com.github.kfcfans.oms.worker.pojo.response.MapTaskResponse;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import lombok.extern.slf4j.Slf4j;
@ -59,7 +59,7 @@ public abstract class MapReduceProcessor implements BasicProcessor {
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskContext.getTaskTrackerAddress(), AkkaConstant.Task_TRACKER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
CompletionStage<Object> requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS));
MapTaskResponse respObj = (MapTaskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
requestSucceed = respObj.isSuccess();
}catch (Exception e) {
log.warn("[MapReduceProcessor] map failed.", e);

View File

@ -6,7 +6,7 @@ import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils;
import org.junit.jupiter.api.Test;
/**
@ -12,7 +13,12 @@ import org.junit.jupiter.api.Test;
public class UtilsTest {
@Test
public void testNetUtils() throws Exception {
public void testNetUtils() {
System.out.println("本机IP" + NetUtils.getLocalHost());
}
@Test
public void testSystemInfoUtils() {
System.out.println(SystemInfoUtils.getSystemMetrics());
}
}