develop the simple Failover

This commit is contained in:
tjq 2020-03-28 14:09:07 +08:00
parent f844dd8db1
commit be4e41692d
16 changed files with 196 additions and 181 deletions

View File

@ -11,10 +11,10 @@ import lombok.Getter;
*/
@Getter
@AllArgsConstructor
public enum JobInstanceStatus {
RUNNING(1, "运行中"),
SUCCEED(2, "运行成功"),
FAILED(3, "运行失败");
public enum InstanceStatus {
RUNNING(3, "运行中"),
SUCCEED(4, "运行成功"),
FAILED(5, "运行失败");
private int value;
private String des;

View File

@ -1,21 +1,20 @@
package com.github.kfcfans.common;
/**
* akka actor 名称
* RemoteConstant
*
* @author tjq
* @since 2020/3/17
*/
public class AkkaConstant {
public class RemoteConstant {
/**
* 默认端口
*/
public static final int DEFAULT_PORT = 25520;
/**
* 顶层ActoractorSystem名称
*/
/* ************************ AKKA CLIENT ************************ */
public static final int DEFAULT_CLIENT_PORT = 25520;
public static final String ACTOR_SYSTEM_NAME = "oms";
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
@ -25,8 +24,11 @@ public class AkkaConstant {
/* ************************ SERVER ************************ */
/* ************************ AKKA SERVER ************************ */
public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server";
public static final String SERVER_ACTOR_NAME = "server_actor";
/* ************************ OTHERS ************************ */
public static final String EMPTY_ADDRESS = "N/A";
}

View File

@ -6,7 +6,7 @@ import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
@ -68,18 +68,18 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
// 初始化 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();
String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP();
int port = config.getListeningPort() == null ? AkkaConstant.DEFAULT_PORT : config.getListeningPort();
int port = config.getListeningPort() == null ? RemoteConstant.DEFAULT_CLIENT_PORT : config.getListeningPort();
overrideConfig.put("akka.remote.artery.canonical.hostname", localIP);
overrideConfig.put("akka.remote.artery.canonical.port", port);
workerAddress = localIP + ":" + port;
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG_NAME);
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(AkkaConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(TaskTrackerActor.class), AkkaConstant.Task_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
actorSystem = ActorSystem.create(RemoteConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
// 初始化存储

View File

@ -7,7 +7,6 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
@ -15,7 +14,6 @@ import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.List;
@ -50,7 +48,7 @@ public class TaskTrackerActor extends AbstractActor {
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
} else {
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult(), false);
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult());
}
}
@ -121,7 +119,7 @@ public class TaskTrackerActor extends AbstractActor {
// 2. 更新根任务状态广播任务的根任务为 preProcess 任务
int status = success ? TaskStatus.WORKER_PROCESS_SUCCESS.getValue() : TaskStatus.WORKER_PROCESS_FAILED.getValue();
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status, req.getMsg(), false);
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status, req.getMsg());
}
/**

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms.worker.background;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.model.SystemMetrics;
import com.github.kfcfans.common.request.WorkerHealthReportReq;
import com.github.kfcfans.oms.worker.OhMyWorker;
@ -30,7 +30,7 @@ public class WorkerHealthReportRunnable implements Runnable {
reportReq.setTotalAddress(OhMyWorker.getWorkerAddress());
// 发送请求
String serverPath = AkkaUtils.getAkkaServerNodePath(AkkaConstant.SERVER_ACTOR_NAME);
String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath);
actorSelection.tell(reportReq, null);
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.oms.worker.common.utils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.RemoteConstant;
/**
* AKKA 工具类
@ -20,12 +20,12 @@ public class AkkaUtils {
public static String getAkkaRemotePath(String ip, String actorName) {
Integer configPort = OhMyWorker.getConfig().getListeningPort();
int port = configPort == null ? AkkaConstant.DEFAULT_PORT : configPort;
return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, port, actorName);
int port = configPort == null ? RemoteConstant.DEFAULT_CLIENT_PORT : configPort;
return String.format(AKKA_REMOTE_NODE_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, ip, port, actorName);
}
public static String getAkkaServerNodePath(String actorName) {
return String.format(AKKA_SERVER_NODE_PATH, AkkaConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName);
return String.format(AKKA_SERVER_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName);
}
}

View File

@ -21,6 +21,7 @@ import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor;
import com.google.common.base.Stopwatch;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
@ -40,6 +41,7 @@ public class ProcessorRunnable implements Runnable {
private InstanceInfo instanceInfo;
private final ActorSelection taskTrackerActor;
@Getter
private final TaskDO task;
@Override

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.oms.worker.core.tracker.processor;
import akka.actor.ActorSelection;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
@ -13,7 +13,6 @@ import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportRe
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.concurrent.*;
@ -38,6 +37,8 @@ public class ProcessorTracker {
private ThreadPoolExecutor threadPool;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
/**
* 创建 ProcessorTracker其实就是创建了个执行用的线程池 T_T
*/
@ -48,7 +49,7 @@ public class ProcessorTracker {
this.instanceInfo = request.getInstanceInfo();
this.instanceId = request.getInstanceInfo().getInstanceId();
this.taskTrackerAddress = request.getTaskTrackerAddress();
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, AkkaConstant.Task_TRACKER_ACTOR_NAME);
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
// 初始化
@ -68,15 +69,25 @@ public class ProcessorTracker {
*/
public void submitTask(TaskDO newTask) {
boolean success = false;
// 1. 设置值并提交执行
newTask.setJobId(instanceInfo.getJobId());
newTask.setInstanceId(instanceInfo.getInstanceId());
newTask.setAddress(taskTrackerAddress);
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask);
try {
threadPool.submit(processorRunnable);
success = true;
}catch (RejectedExecutionException ignore) {
log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
instanceId, newTask.getTaskId(), newTask.getTaskName());
}catch (Exception e) {
log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
}
// 2. 回复接收成功
if (success) {
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
reportReq.setInstanceId(instanceId);
reportReq.setTaskId(newTask.getTaskId());
@ -88,6 +99,7 @@ public class ProcessorTracker {
log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
}
}
/**
* 任务是否超时
@ -101,11 +113,17 @@ public class ProcessorTracker {
* 初始化线程池
*/
private void initProcessorPool() {
int poolSize = instanceInfo.getThreadConcurrency();
// 待执行队列为了防止对内存造成较大压力内存队列不能太大
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE);
// 自定义线程池中线程名称
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build();
threadPool = new ThreadPoolExecutor(instanceInfo.getTaskRetryNum(), instanceInfo.getTaskRetryNum(), 60L, TimeUnit.SECONDS, queue, threadFactory);
// 拒绝策略直接抛出异常
RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
// 当没有任务执行时允许销毁核心线程即线程池最终存活线程个数可能为0
threadPool.allowCoreThreadTimeOut(true);
}
@ -117,7 +135,7 @@ public class ProcessorTracker {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build();
ScheduledExecutorService timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
timingPool.scheduleAtFixedRate(new TimingStatusReportRunnable(), 0, 15, TimeUnit.SECONDS);
timingPool.scheduleAtFixedRate(new TimingStatusReportRunnable(), 0, 10, TimeUnit.SECONDS);
}

View File

@ -3,12 +3,12 @@ package com.github.kfcfans.oms.worker.core.tracker.task;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.JobInstanceStatus;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.constants.CommonSJ;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
@ -89,7 +89,7 @@ public class TaskTracker {
persistenceRootTask();
// 3. 启动定时任务任务派发 & 状态检查
scheduledPool.scheduleWithFixedDelay(new DispatcherRunnable(), 0, 5, TimeUnit.SECONDS);
scheduledPool.scheduleWithFixedDelay(new DispatcherRunnable(), 0, 1, TimeUnit.SECONDS);
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS);
log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", req.getInstanceId(), req);
@ -100,16 +100,11 @@ public class TaskTracker {
* 更新任务状态
* 任务状态机只允许数字递增
*/
public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result, boolean force) {
public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result) {
boolean updateResult;
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
// 1. 强制模式直接执行持久化操作该模式状态只允许变更为非完成状态
if (force) {
updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, nTaskStatus, result);
}else {
// 2. 读取当前 Task 状态防止逆状态机变更的出现
// 1. 读取当前 Task 状态防止逆状态机变更的出现
Optional<TaskStatus> dbTaskStatusOpt = taskPersistenceService.getTaskStatus(instanceId, taskId);
if (!dbTaskStatusOpt.isPresent()) {
@ -117,7 +112,7 @@ public class TaskTracker {
instanceId, taskId, newStatus);
}
// 数据库没查到也允许写入这个还需要日后仔细考虑
// 2. 数据库没查到也允许写入这个还需要日后仔细考虑
if (dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH).getValue() > newStatus) {
// 必存在但不怎么写Java会警告...
TaskStatus dbTaskStatus = dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH);
@ -132,7 +127,13 @@ public class TaskTracker {
// 数据库查询失败的话就只重试一次
int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(instanceInfo.getTaskRetryNum() - 1);
if (failedCnt < instanceInfo.getTaskRetryNum()) {
boolean retryTask = taskPersistenceService.updateRetryTask(instanceId, taskId, failedCnt + 1);
TaskDO updateEntity = new TaskDO();
updateEntity.setFailedCnt(failedCnt + 1);
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
if (retryTask) {
log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId);
return;
@ -141,8 +142,10 @@ public class TaskTracker {
}
// 4. 更新状态失败重试写入DB失败的也就不重试了...谁让你那么倒霉呢...
updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, nTaskStatus, result);
}
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(nTaskStatus.getValue());
updateEntity.setResult(result);
updateResult = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
if (!updateResult) {
log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId);
@ -180,7 +183,7 @@ public class TaskTracker {
ProcessorTrackerStatus processorTrackerStatus = pTAddress2Status.get(heartbeatReq.getIp());
processorTrackerStatus.update(heartbeatReq);
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceInfo.getInstanceId(), processorTrackerStatus);
}
public boolean finished() {
@ -232,6 +235,10 @@ public class TaskTracker {
@Override
public void run() {
if (finished()) {
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
String instanceId = instanceInfo.getInstanceId();
@ -251,13 +258,14 @@ public class TaskTracker {
// 3. 避免大查询分批派发任务
long currentDispatchNum = 0;
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency();
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2;
AtomicInteger index = new AtomicInteger(0);
// 4. 循环查询数据库获取需要派发的任务
while (maxDispatchNum > currentDispatchNum) {
List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, DB_QUERY_LIMIT);
int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum);
List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit);
currentDispatchNum += needDispatchTasks.size();
needDispatchTasks.forEach(task -> {
@ -266,28 +274,31 @@ public class TaskTracker {
// 获取 ProcessorTracker 地址如果 Task 中自带了 Address则使用该 Address
String ptAddress = task.getAddress();
if (StringUtils.isEmpty(ptAddress)) {
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
}
String ptActorPath = AkkaUtils.getAkkaRemotePath(ptAddress, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
String ptActorPath = AkkaUtils.getAkkaRemotePath(ptAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath);
ptActor.tell(startTaskReq, null);
// 更新 ProcessorTrackerStatus 状态
pTAddress2Status.get(ptAddress).setDispatched(true);
// 更新数据库如果更新数据库失败可能导致重复执行先不处理
taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, null);
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}} successfully.)", task.getInstanceId(), task.getTaskId(), task.getTaskName());
log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", task.getInstanceId(), task.getTaskId(), task.getTaskName());
});
// 数量不足 查询失败则终止循环
if (needDispatchTasks.size() < DB_QUERY_LIMIT) {
if (needDispatchTasks.size() < dbQueryLimit) {
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch);
return;
}
}
log.debug("[TaskTracker-{}] dispatch {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch);
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch);
}
}
@ -365,15 +376,15 @@ public class TaskTracker {
finished.set(finishedBoolean);
}
String serverPath = AkkaUtils.getAkkaServerNodePath(AkkaConstant.SERVER_ACTOR_NAME);
String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
// 3. 执行成功报告服务器
// 3. 执行完毕报告服务器
if (finished.get() && resultTask != null) {
boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
req.setResult(resultTask.getResult());
req.setInstanceStatus(success ? JobInstanceStatus.SUCCEED.getValue() : JobInstanceStatus.FAILED.getValue());
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getValue() : InstanceStatus.FAILED.getValue());
CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS));
@ -396,7 +407,7 @@ public class TaskTracker {
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
allWorkerAddress.forEach(ptIP -> {
String ptPath = AkkaUtils.getAkkaRemotePath(ptIP, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
String ptPath = AkkaUtils.getAkkaRemotePath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath);
// 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
ptActor.tell(stopRequest, null);
@ -410,18 +421,27 @@ public class TaskTracker {
}
// 4. 未完成上报状态
req.setInstanceStatus(JobInstanceStatus.RUNNING.getValue());
req.setInstanceStatus(InstanceStatus.RUNNING.getValue());
serverActor.tell(req, null);
// 5.1 超时检查 -> 派发未接受的任务
// 5.1 超时检查 -> 重试派发后未确认的任务
long currentMS = System.currentTimeMillis();
if (workerUnreceivedNum != 0) {
taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
if (elapsedTime > TIME_OUT_MS) {
updateTaskStatus(instanceId, uncheckTask.getTaskId(), TaskStatus.WAITING_DISPATCH.getValue(), null, true);
log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from processor tracker.",
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
// 特殊任务只能本机执行
if (!TaskConstant.LAST_TASK_ID.equals(uncheckTask.getTaskId())) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
}
taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);
log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
instanceId, uncheckTask.getTaskId());
}

View File

@ -66,16 +66,29 @@ public class TaskPersistenceService {
return false;
}
/**
* 依靠主键更新 Task
*/
public boolean updateTask(String instanceId, String taskId, TaskDO updateEntity) {
try {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
return execute(() -> taskDAO.simpleUpdate(query, updateEntity));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTask failed.", e);
}
return false;
}
/**
* 获取 MapReduce Broadcast 的最后一个任务
*/
public Optional<TaskDO> getLastTask(String instanceId) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setTaskName(TaskConstant.LAST_TASK_NAME);
return execute(() -> {
List<TaskDO> taskDOS = taskDAO.simpleQuery(query);
if (CollectionUtils.isEmpty(taskDOS)) {
return Optional.empty();
@ -91,9 +104,9 @@ public class TaskPersistenceService {
public List<TaskDO> getAllTask(String instanceId) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
return execute(() -> {
return taskDAO.simpleQuery(query);
});
}catch (Exception e) {
@ -107,14 +120,11 @@ public class TaskPersistenceService {
*/
public List<TaskDO> getTaskByStatus(String instanceId, TaskStatus status, int limit) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setStatus(status.getValue());
query.setLimit(limit);
return taskDAO.simpleQuery(query);
});
return execute(() -> taskDAO.simpleQuery(query));
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e);
}
@ -127,13 +137,14 @@ public class TaskPersistenceService {
*/
public Map<TaskStatus, Long> getTaskStatusStatistics(String instanceId) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setQueryContent("status, count(*) as num");
query.setOtherCondition("GROUP BY status");
List<Map<String, Object>> dbRES = taskDAO.simpleQueryPlus(query);
return execute(() -> {
List<Map<String, Object>> dbRES = taskDAO.simpleQueryPlus(query);
Map<TaskStatus, Long> result = Maps.newHashMap();
dbRES.forEach(row -> {
// H2 数据库都是大写...
@ -149,25 +160,6 @@ public class TaskPersistenceService {
return Maps.newHashMap();
}
/**
* 获取等待执行的任务数量 ProcessorTracker侧
* @param spInstanceId 特殊处理的 instanceId
* @return 数量
*/
public Optional<Long> getWaitingToRunTaskNum(String spInstanceId) {
try {
return execute(() -> {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setQueryContent("count(*) as num");
Long num = Long.parseLong(taskDAO.simpleQueryPlus(query).get(0).get("NUM").toString());
return Optional.of(num);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] getWaitingToRunTaskNum for instance(id={}) failed.", spInstanceId, e);
}
return Optional.empty();
}
/**
* 查询 taskId -> taskResultreduce阶段或postProcess 阶段使用
*/
@ -186,9 +178,9 @@ public class TaskPersistenceService {
public Optional<TaskStatus> getTaskStatus(String instanceId, String taskId) {
try {
return execute(() -> {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
query.setQueryContent("STATUS");
return execute(() -> {
List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query);
return Optional.of(TaskStatus.of((int) rows.get(0).get("STATUS")));
});
@ -204,9 +196,9 @@ public class TaskPersistenceService {
public Optional<Integer> getTaskFailedCnt(String instanceId, String taskId) {
try {
return execute(() -> {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
query.setQueryContent("failed_cnt");
return execute(() -> {
List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query);
// 查询成功不可能为空
return Optional.of((Integer) rows.get(0).get("FAILED_CNT"));
@ -217,23 +209,6 @@ public class TaskPersistenceService {
return Optional.empty();
}
/**
* 更新 Task 的状态
*/
public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status, String result) {
try {
return execute(() -> {
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(status.getValue());
updateEntity.setResult(result);
return taskDAO.simpleUpdate(genKeyQuery(instanceId, taskId), updateEntity);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskId={},status={},result={}.",
instanceId, taskId, status, result, e);
}
return false;
}
/**
* 批量更新 Task 状态
@ -258,23 +233,6 @@ public class TaskPersistenceService {
return false;
}
public boolean updateRetryTask(String instanceId, String taskId, int failedCnt) {
try {
return execute(() -> {
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
// 重新选取 worker 节点重试
updateEntity.setAddress("");
updateEntity.setFailedCnt(failedCnt);
return taskDAO.simpleUpdate(genKeyQuery(instanceId, taskId), updateEntity);
});
}catch (Exception e) {
log.error("[TaskPersistenceService] updateRetryTask failed, instanceId={},taskId={},failedCnt={}.", instanceId, taskId, failedCnt, e);
}
return false;
}
public boolean batchDelete(String instanceId, List<String> taskIds) {
try {

View File

@ -25,14 +25,24 @@ public class ProcessorTrackerStatus {
private long remainTaskNum;
// 是否被派发过任务
private boolean dispatched;
// 是否接收到过来自 ProcessorTracker 的心跳
private boolean connected;
public void init(String ip) {
this.ip = ip;
/**
* 初始化 ProcessorTracker此时并未持有实际的 ProcessorTracker 状态
*/
public void init(String ptIP) {
this.ip = ptIP;
this.lastActiveTime = System.currentTimeMillis();
this.remainTaskNum = 0;
this.dispatched = false;
this.connected = false;
}
/**
* 接收到 ProcessorTracker 的心跳信息后更新状态
* @param req ProcessorTracker的心跳信息
*/
public void update(ProcessorTrackerStatusReportReq req) {
// 延迟到达的请求直接忽略
@ -44,6 +54,7 @@ public class ProcessorTrackerStatus {
this.lastActiveTime = req.getTime();
this.remainTaskNum = req.getRemainTaskNum();
this.dispatched = true;
this.connected = true;
}
/**
@ -56,6 +67,11 @@ public class ProcessorTrackerStatus {
return true;
}
// 已派发但未收到响应则不可用
if (!connected) {
return false;
}
// 长时间未收到心跳消息则不可用
if (isTimeout()) {
return false;

View File

@ -4,7 +4,7 @@ import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
@ -57,7 +57,7 @@ public abstract class MapReduceProcessor implements BasicProcessor {
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
boolean requestSucceed = false;
try {
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(task.getAddress(), AkkaConstant.Task_TRACKER_ACTOR_NAME);
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
CompletionStage<Object> requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS));
AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);

View File

@ -6,7 +6,7 @@ import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
@ -36,7 +36,7 @@ public class ProcessorTrackerTest {
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
remoteProcessorTracker = testAS.actorSelection(akkaRemotePath);
}

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.oms;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
@ -34,7 +34,7 @@ public class TaskTrackerTest {
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), AkkaConstant.Task_TRACKER_ACTOR_NAME);
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
}

View File

@ -37,7 +37,7 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
if (isRootTask()) {
System.out.println("start to map");
List<TestSubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < 1; j++) {
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 100; i++) {
int x = j * 100 + i;
subTasks.add(new TestSubTask("name" + x, x));
@ -49,7 +49,7 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
return new ProcessResult(true, "MAP_SUCCESS");
}else {
System.out.println("start to process");
// Thread.sleep(1000);
Thread.sleep(1000);
System.out.println(context.getSubTask());
if (context.getCurrentRetryTimes() == 0) {
return new ProcessResult(false, "FIRST_FAILED");

View File

@ -5,7 +5,8 @@
<!-- ConsoleAppender把日志输出到控制台 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %green([%thread]) - %cyan(%msg%n)</pattern>
<!-- <pattern>%red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %green([%thread]) - %cyan(%msg%n)</pattern>-->
<pattern>%red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) - %cyan(%msg%n)</pattern>
<!-- 控制台也要使用UTF-8不要使用GBK否则会中文乱码 -->
<charset>UTF-8</charset>
</encoder>