From be4e41692dd04cbfd6efffd484d8c064f6232dcf Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 28 Mar 2020 14:09:07 +0800 Subject: [PATCH] develop the simple Failover --- ...nstanceStatus.java => InstanceStatus.java} | 8 +- ...{AkkaConstant.java => RemoteConstant.java} | 16 ++- .../github/kfcfans/oms/worker/OhMyWorker.java | 12 +- .../oms/worker/actors/TaskTrackerActor.java | 6 +- .../WorkerHealthReportRunnable.java | 4 +- .../oms/worker/common/utils/AkkaUtils.java | 8 +- .../core/executor/ProcessorRunnable.java | 2 + .../tracker/processor/ProcessorTracker.java | 48 ++++--- .../worker/core/tracker/task/TaskTracker.java | 124 ++++++++++-------- .../persistence/TaskPersistenceService.java | 110 +++++----------- .../pojo/model/ProcessorTrackerStatus.java | 20 ++- .../worker/sdk/api/MapReduceProcessor.java | 4 +- .../kfcfans/oms/ProcessorTrackerTest.java | 4 +- .../github/kfcfans/oms/TaskTrackerTest.java | 4 +- .../processors/TestMapReduceProcessor.java | 4 +- .../src/test/resources/logback-test.xml | 3 +- 16 files changed, 196 insertions(+), 181 deletions(-) rename oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/{JobInstanceStatus.java => InstanceStatus.java} (65%) rename oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/{AkkaConstant.java => RemoteConstant.java} (61%) diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/JobInstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java similarity index 65% rename from oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/JobInstanceStatus.java rename to oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java index 61d1f18d..95b79a9c 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/JobInstanceStatus.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java @@ -11,10 +11,10 @@ import lombok.Getter; */ @Getter @AllArgsConstructor -public enum JobInstanceStatus { - RUNNING(1, "运行中"), - SUCCEED(2, "运行成功"), - FAILED(3, "运行失败"); +public enum InstanceStatus { + RUNNING(3, "运行中"), + SUCCEED(4, "运行成功"), + FAILED(5, "运行失败"); private int value; private String des; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/AkkaConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java similarity index 61% rename from oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/AkkaConstant.java rename to oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java index a8919ed2..df0977db 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/AkkaConstant.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java @@ -1,21 +1,20 @@ package com.github.kfcfans.common; /** - * akka actor 名称 + * RemoteConstant * * @author tjq * @since 2020/3/17 */ -public class AkkaConstant { +public class RemoteConstant { - /** - * 默认端口 - */ - public static final int DEFAULT_PORT = 25520; /** * 顶层Actor(actorSystem名称) */ + /* ************************ AKKA CLIENT ************************ */ + public static final int DEFAULT_CLIENT_PORT = 25520; + public static final String ACTOR_SYSTEM_NAME = "oms"; public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; @@ -25,8 +24,11 @@ public class AkkaConstant { - /* ************************ SERVER ************************ */ + /* ************************ AKKA SERVER ************************ */ public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server"; public static final String SERVER_ACTOR_NAME = "server_actor"; + + /* ************************ OTHERS ************************ */ + public static final String EMPTY_ADDRESS = "N/A"; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index 92c29ce0..91006fc2 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -6,7 +6,7 @@ import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.oms.worker.actors.TaskTrackerActor; import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable; import com.github.kfcfans.oms.worker.common.OhMyConfig; -import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.oms.worker.common.utils.NetUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; @@ -68,18 +68,18 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { // 初始化 ActorSystem Map overrideConfig = Maps.newHashMap(); String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP(); - int port = config.getListeningPort() == null ? AkkaConstant.DEFAULT_PORT : config.getListeningPort(); + int port = config.getListeningPort() == null ? RemoteConstant.DEFAULT_CLIENT_PORT : config.getListeningPort(); overrideConfig.put("akka.remote.artery.canonical.hostname", localIP); overrideConfig.put("akka.remote.artery.canonical.port", port); workerAddress = localIP + ":" + port; log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress); - Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG_NAME); + Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.AKKA_CONFIG_NAME); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); - actorSystem = ActorSystem.create(AkkaConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(TaskTrackerActor.class), AkkaConstant.Task_TRACKER_ACTOR_NAME); - actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); + actorSystem = ActorSystem.create(RemoteConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig); + actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); // 初始化存储 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index b86117e3..d8e89573 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -7,7 +7,6 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.oms.worker.persistence.TaskDO; -import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; @@ -15,7 +14,6 @@ import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; import java.util.List; @@ -50,7 +48,7 @@ public class TaskTrackerActor extends AbstractActor { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); } else { - taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult(), false); + taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult()); } } @@ -121,7 +119,7 @@ public class TaskTrackerActor extends AbstractActor { // 2. 更新根任务状态(广播任务的根任务为 preProcess 任务) int status = success ? TaskStatus.WORKER_PROCESS_SUCCESS.getValue() : TaskStatus.WORKER_PROCESS_FAILED.getValue(); - taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status, req.getMsg(), false); + taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status, req.getMsg()); } /** diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java index 425f7875..387d3a05 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/WorkerHealthReportRunnable.java @@ -1,7 +1,7 @@ package com.github.kfcfans.oms.worker.background; import akka.actor.ActorSelection; -import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.model.SystemMetrics; import com.github.kfcfans.common.request.WorkerHealthReportReq; import com.github.kfcfans.oms.worker.OhMyWorker; @@ -30,7 +30,7 @@ public class WorkerHealthReportRunnable implements Runnable { reportReq.setTotalAddress(OhMyWorker.getWorkerAddress()); // 发送请求 - String serverPath = AkkaUtils.getAkkaServerNodePath(AkkaConstant.SERVER_ACTOR_NAME); + String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME); ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath); actorSelection.tell(reportReq, null); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java index e74a44ad..cc773323 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java @@ -1,7 +1,7 @@ package com.github.kfcfans.oms.worker.common.utils; import com.github.kfcfans.oms.worker.OhMyWorker; -import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.RemoteConstant; /** * AKKA 工具类 @@ -20,12 +20,12 @@ public class AkkaUtils { public static String getAkkaRemotePath(String ip, String actorName) { Integer configPort = OhMyWorker.getConfig().getListeningPort(); - int port = configPort == null ? AkkaConstant.DEFAULT_PORT : configPort; - return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, port, actorName); + int port = configPort == null ? RemoteConstant.DEFAULT_CLIENT_PORT : configPort; + return String.format(AKKA_REMOTE_NODE_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, ip, port, actorName); } public static String getAkkaServerNodePath(String actorName) { - return String.format(AKKA_SERVER_NODE_PATH, AkkaConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); + return String.format(AKKA_SERVER_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 066478d0..77daf022 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -21,6 +21,7 @@ import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor; import com.google.common.base.Stopwatch; import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -40,6 +41,7 @@ public class ProcessorRunnable implements Runnable { private InstanceInfo instanceInfo; private final ActorSelection taskTrackerActor; + @Getter private final TaskDO task; @Override diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index f1837a5f..db3b6191 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -2,7 +2,7 @@ package com.github.kfcfans.oms.worker.core.tracker.processor; import akka.actor.ActorSelection; import com.github.kfcfans.oms.worker.OhMyWorker; -import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable; @@ -13,7 +13,6 @@ import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportRe import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; import java.util.concurrent.*; @@ -38,6 +37,8 @@ public class ProcessorTracker { private ThreadPoolExecutor threadPool; + private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100; + /** * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T) */ @@ -48,7 +49,7 @@ public class ProcessorTracker { this.instanceInfo = request.getInstanceInfo(); this.instanceId = request.getInstanceInfo().getInstanceId(); this.taskTrackerAddress = request.getTaskTrackerAddress(); - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, AkkaConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); // 初始化 @@ -68,25 +69,36 @@ public class ProcessorTracker { */ public void submitTask(TaskDO newTask) { + boolean success = false; // 1. 设置值并提交执行 newTask.setJobId(instanceInfo.getJobId()); newTask.setInstanceId(instanceInfo.getInstanceId()); newTask.setAddress(taskTrackerAddress); ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask); - threadPool.submit(processorRunnable); + try { + threadPool.submit(processorRunnable); + success = true; + }catch (RejectedExecutionException ignore) { + log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.", + instanceId, newTask.getTaskId(), newTask.getTaskName()); + }catch (Exception e) { + log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e); + } // 2. 回复接收成功 - ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); - reportReq.setInstanceId(instanceId); - reportReq.setTaskId(newTask.getTaskId()); - reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); + if (success) { + ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); + reportReq.setInstanceId(instanceId); + reportReq.setTaskId(newTask.getTaskId()); + reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); - reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); - taskTrackerActorRef.tell(reportReq, null); + reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); + taskTrackerActorRef.tell(reportReq, null); - log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", - instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size()); + log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", + instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size()); + } } /** @@ -101,11 +113,17 @@ public class ProcessorTracker { * 初始化线程池 */ private void initProcessorPool() { + + int poolSize = instanceInfo.getThreadConcurrency(); // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大 - BlockingQueue queue = new LinkedBlockingQueue<>(); + BlockingQueue queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE); // 自定义线程池中线程名称 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build(); - threadPool = new ThreadPoolExecutor(instanceInfo.getTaskRetryNum(), instanceInfo.getTaskRetryNum(), 60L, TimeUnit.SECONDS, queue, threadFactory); + // 拒绝策略:直接抛出异常 + RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); + + threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler); + // 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0) threadPool.allowCoreThreadTimeOut(true); } @@ -117,7 +135,7 @@ public class ProcessorTracker { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build(); ScheduledExecutorService timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); - timingPool.scheduleAtFixedRate(new TimingStatusReportRunnable(), 0, 15, TimeUnit.SECONDS); + timingPool.scheduleAtFixedRate(new TimingStatusReportRunnable(), 0, 10, TimeUnit.SECONDS); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index c7f7262c..6a49852b 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -3,12 +3,12 @@ package com.github.kfcfans.oms.worker.core.tracker.task; import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.common.ExecuteType; -import com.github.kfcfans.common.JobInstanceStatus; +import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.oms.worker.OhMyWorker; -import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.oms.worker.common.constants.CommonSJ; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; @@ -89,7 +89,7 @@ public class TaskTracker { persistenceRootTask(); // 3. 启动定时任务(任务派发 & 状态检查) - scheduledPool.scheduleWithFixedDelay(new DispatcherRunnable(), 0, 5, TimeUnit.SECONDS); + scheduledPool.scheduleWithFixedDelay(new DispatcherRunnable(), 0, 1, TimeUnit.SECONDS); scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", req.getInstanceId(), req); @@ -100,50 +100,53 @@ public class TaskTracker { * 更新任务状态 * 任务状态机只允许数字递增 */ - public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result, boolean force) { + public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result) { boolean updateResult; TaskStatus nTaskStatus = TaskStatus.of(newStatus); - // 1. 强制模式,直接执行持久化操作(该模式状态只允许变更为非完成状态) - if (force) { - updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, nTaskStatus, result); - }else { + // 1. 读取当前 Task 状态,防止逆状态机变更的出现 + Optional dbTaskStatusOpt = taskPersistenceService.getTaskStatus(instanceId, taskId); - // 2. 读取当前 Task 状态,防止逆状态机变更的出现 - Optional dbTaskStatusOpt = taskPersistenceService.getTaskStatus(instanceId, taskId); + if (!dbTaskStatusOpt.isPresent()) { + log.warn("[TaskTracker-{}] query TaskStatus from DB failed when try to update new TaskStatus(taskId={},newStatus={}).", + instanceId, taskId, newStatus); + } - if (!dbTaskStatusOpt.isPresent()) { - log.warn("[TaskTracker-{}] query TaskStatus from DB failed when try to update new TaskStatus(taskId={},newStatus={}).", - instanceId, taskId, newStatus); - } + // 2. 数据库没查到,也允许写入(这个还需要日后仔细考虑) + if (dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH).getValue() > newStatus) { + // 必存在,但不怎么写,Java会警告... + TaskStatus dbTaskStatus = dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH); + log.warn("[TaskTracker-{}] task(taskId={},dbStatus={},requestStatus={}) status conflict, TaskTracker won't update the status.", + instanceId, taskId, dbTaskStatus, nTaskStatus); + return; + } - // 数据库没查到,也允许写入(这个还需要日后仔细考虑) - if (dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH).getValue() > newStatus) { - // 必存在,但不怎么写,Java会警告... - TaskStatus dbTaskStatus = dbTaskStatusOpt.orElse(TaskStatus.WAITING_DISPATCH); - log.warn("[TaskTracker-{}] task(taskId={},dbStatus={},requestStatus={}) status conflict, TaskTracker won't update the status.", - instanceId, taskId, dbTaskStatus, nTaskStatus); - return; - } + // 3. 失败重试处理 + if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) { - // 3. 失败重试处理 - if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) { + // 数据库查询失败的话,就只重试一次 + int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(instanceInfo.getTaskRetryNum() - 1); + if (failedCnt < instanceInfo.getTaskRetryNum()) { - // 数据库查询失败的话,就只重试一次 - int failedCnt = taskPersistenceService.getTaskFailedCnt(instanceId, taskId).orElse(instanceInfo.getTaskRetryNum() - 1); - if (failedCnt < instanceInfo.getTaskRetryNum()) { - boolean retryTask = taskPersistenceService.updateRetryTask(instanceId, taskId, failedCnt + 1); - if (retryTask) { - log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId); - return; - } + TaskDO updateEntity = new TaskDO(); + updateEntity.setFailedCnt(failedCnt + 1); + updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); + updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + + boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity); + if (retryTask) { + log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, taskId); + return; } } - - // 4. 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...) - updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, nTaskStatus, result); } + // 4. 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...) + TaskDO updateEntity = new TaskDO(); + updateEntity.setStatus(nTaskStatus.getValue()); + updateEntity.setResult(result); + updateResult = taskPersistenceService.updateTask(instanceId, taskId, updateEntity); + if (!updateResult) { log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); } @@ -180,7 +183,7 @@ public class TaskTracker { ProcessorTrackerStatus processorTrackerStatus = pTAddress2Status.get(heartbeatReq.getIp()); processorTrackerStatus.update(heartbeatReq); - + log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceInfo.getInstanceId(), processorTrackerStatus); } public boolean finished() { @@ -232,6 +235,10 @@ public class TaskTracker { @Override public void run() { + if (finished()) { + return; + } + Stopwatch stopwatch = Stopwatch.createStarted(); String instanceId = instanceInfo.getInstanceId(); @@ -251,13 +258,14 @@ public class TaskTracker { // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; - long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency(); + long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2; AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务 while (maxDispatchNum > currentDispatchNum) { - List needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, DB_QUERY_LIMIT); + int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum); + List needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit); currentDispatchNum += needDispatchTasks.size(); needDispatchTasks.forEach(task -> { @@ -266,28 +274,31 @@ public class TaskTracker { // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address String ptAddress = task.getAddress(); - if (StringUtils.isEmpty(ptAddress)) { + if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } - String ptActorPath = AkkaUtils.getAkkaRemotePath(ptAddress, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); + String ptActorPath = AkkaUtils.getAkkaRemotePath(ptAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath); ptActor.tell(startTaskReq, null); // 更新 ProcessorTrackerStatus 状态 pTAddress2Status.get(ptAddress).setDispatched(true); // 更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) - taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, null); + TaskDO updateEntity = new TaskDO(); + updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue()); + taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity); - log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}} successfully.)", task.getInstanceId(), task.getTaskId(), task.getTaskName()); + log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", task.getInstanceId(), task.getTaskId(), task.getTaskName()); }); // 数量不足 或 查询失败,则终止循环 - if (needDispatchTasks.size() < DB_QUERY_LIMIT) { + if (needDispatchTasks.size() < dbQueryLimit) { + log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch); return; } } - log.debug("[TaskTracker-{}] dispatch {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch); + log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch); } } @@ -365,15 +376,15 @@ public class TaskTracker { finished.set(finishedBoolean); } - String serverPath = AkkaUtils.getAkkaServerNodePath(AkkaConstant.SERVER_ACTOR_NAME); + String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME); ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); - // 3. 执行成功,报告服务器 + // 3. 执行完毕,报告服务器 if (finished.get() && resultTask != null) { boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(); req.setResult(resultTask.getResult()); - req.setInstanceStatus(success ? JobInstanceStatus.SUCCEED.getValue() : JobInstanceStatus.FAILED.getValue()); + req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getValue() : InstanceStatus.FAILED.getValue()); CompletionStage askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS)); @@ -396,7 +407,7 @@ public class TaskTracker { TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); stopRequest.setInstanceId(instanceId); allWorkerAddress.forEach(ptIP -> { - String ptPath = AkkaUtils.getAkkaRemotePath(ptIP, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); + String ptPath = AkkaUtils.getAkkaRemotePath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath); // 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 ptActor.tell(stopRequest, null); @@ -410,18 +421,27 @@ public class TaskTracker { } // 4. 未完成,上报状态 - req.setInstanceStatus(JobInstanceStatus.RUNNING.getValue()); + req.setInstanceStatus(InstanceStatus.RUNNING.getValue()); serverActor.tell(req, null); - // 5.1 超时检查 -> 派发未接受的任务 + // 5.1 超时检查 -> 重试派发后未确认的任务 long currentMS = System.currentTimeMillis(); if (workerUnreceivedNum != 0) { taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> { long elapsedTime = currentMS - uncheckTask.getLastModifiedTime(); if (elapsedTime > TIME_OUT_MS) { - updateTaskStatus(instanceId, uncheckTask.getTaskId(), TaskStatus.WAITING_DISPATCH.getValue(), null, true); - log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from processor tracker.", + + TaskDO updateEntity = new TaskDO(); + updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + // 特殊任务只能本机执行 + if (!TaskConstant.LAST_TASK_ID.equals(uncheckTask.getTaskId())) { + updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); + } + + taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity); + + log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from ProcessorTracker.", instanceId, uncheckTask.getTaskId()); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index 1024c9c0..26236169 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -66,16 +66,29 @@ public class TaskPersistenceService { return false; } + /** + * 依靠主键更新 Task + */ + public boolean updateTask(String instanceId, String taskId, TaskDO updateEntity) { + try { + SimpleTaskQuery query = genKeyQuery(instanceId, taskId); + return execute(() -> taskDAO.simpleUpdate(query, updateEntity)); + }catch (Exception e) { + log.error("[TaskPersistenceService] updateTask failed.", e); + } + return false; + } + /** * 获取 MapReduce 或 Broadcast 的最后一个任务 */ public Optional getLastTask(String instanceId) { try { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setTaskName(TaskConstant.LAST_TASK_NAME); return execute(() -> { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setTaskName(TaskConstant.LAST_TASK_NAME); List taskDOS = taskDAO.simpleQuery(query); if (CollectionUtils.isEmpty(taskDOS)) { return Optional.empty(); @@ -91,9 +104,9 @@ public class TaskPersistenceService { public List getAllTask(String instanceId) { try { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); return execute(() -> { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); return taskDAO.simpleQuery(query); }); }catch (Exception e) { @@ -107,14 +120,11 @@ public class TaskPersistenceService { */ public List getTaskByStatus(String instanceId, TaskStatus status, int limit) { try { - return execute(() -> { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setStatus(status.getValue()); - query.setLimit(limit); - - return taskDAO.simpleQuery(query); - }); + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setStatus(status.getValue()); + query.setLimit(limit); + return execute(() -> taskDAO.simpleQuery(query)); }catch (Exception e) { log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e); } @@ -127,13 +137,14 @@ public class TaskPersistenceService { */ public Map getTaskStatusStatistics(String instanceId) { try { - return execute(() -> { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setQueryContent("status, count(*) as num"); - query.setOtherCondition("GROUP BY status"); - List> dbRES = taskDAO.simpleQueryPlus(query); + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setQueryContent("status, count(*) as num"); + query.setOtherCondition("GROUP BY status"); + + return execute(() -> { + List> dbRES = taskDAO.simpleQueryPlus(query); Map result = Maps.newHashMap(); dbRES.forEach(row -> { // H2 数据库都是大写... @@ -149,25 +160,6 @@ public class TaskPersistenceService { return Maps.newHashMap(); } - /** - * 获取等待执行的任务数量 (ProcessorTracker侧) - * @param spInstanceId 特殊处理的 instanceId - * @return 数量 - */ - public Optional getWaitingToRunTaskNum(String spInstanceId) { - try { - return execute(() -> { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setQueryContent("count(*) as num"); - Long num = Long.parseLong(taskDAO.simpleQueryPlus(query).get(0).get("NUM").toString()); - return Optional.of(num); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] getWaitingToRunTaskNum for instance(id={}) failed.", spInstanceId, e); - } - return Optional.empty(); - } - /** * 查询 taskId -> taskResult,reduce阶段或postProcess 阶段使用 */ @@ -186,9 +178,9 @@ public class TaskPersistenceService { public Optional getTaskStatus(String instanceId, String taskId) { try { + SimpleTaskQuery query = genKeyQuery(instanceId, taskId); + query.setQueryContent("STATUS"); return execute(() -> { - SimpleTaskQuery query = genKeyQuery(instanceId, taskId); - query.setQueryContent("STATUS"); List> rows = taskDAO.simpleQueryPlus(query); return Optional.of(TaskStatus.of((int) rows.get(0).get("STATUS"))); }); @@ -204,9 +196,9 @@ public class TaskPersistenceService { public Optional getTaskFailedCnt(String instanceId, String taskId) { try { + SimpleTaskQuery query = genKeyQuery(instanceId, taskId); + query.setQueryContent("failed_cnt"); return execute(() -> { - SimpleTaskQuery query = genKeyQuery(instanceId, taskId); - query.setQueryContent("failed_cnt"); List> rows = taskDAO.simpleQueryPlus(query); // 查询成功不可能为空 return Optional.of((Integer) rows.get(0).get("FAILED_CNT")); @@ -217,23 +209,6 @@ public class TaskPersistenceService { return Optional.empty(); } - /** - * 更新 Task 的状态 - */ - public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status, String result) { - try { - return execute(() -> { - TaskDO updateEntity = new TaskDO(); - updateEntity.setStatus(status.getValue()); - updateEntity.setResult(result); - return taskDAO.simpleUpdate(genKeyQuery(instanceId, taskId), updateEntity); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskId={},status={},result={}.", - instanceId, taskId, status, result, e); - } - return false; - } /** * 批量更新 Task 状态 @@ -258,23 +233,6 @@ public class TaskPersistenceService { return false; } - public boolean updateRetryTask(String instanceId, String taskId, int failedCnt) { - - try { - return execute(() -> { - TaskDO updateEntity = new TaskDO(); - updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); - // 重新选取 worker 节点重试 - updateEntity.setAddress(""); - updateEntity.setFailedCnt(failedCnt); - return taskDAO.simpleUpdate(genKeyQuery(instanceId, taskId), updateEntity); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] updateRetryTask failed, instanceId={},taskId={},failedCnt={}.", instanceId, taskId, failedCnt, e); - } - return false; - } - public boolean batchDelete(String instanceId, List taskIds) { try { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/ProcessorTrackerStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/ProcessorTrackerStatus.java index 4dd712df..46b60c3f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/ProcessorTrackerStatus.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/ProcessorTrackerStatus.java @@ -25,14 +25,24 @@ public class ProcessorTrackerStatus { private long remainTaskNum; // 是否被派发过任务 private boolean dispatched; + // 是否接收到过来自 ProcessorTracker 的心跳 + private boolean connected; - public void init(String ip) { - this.ip = ip; + /** + * 初始化 ProcessorTracker,此时并未持有实际的 ProcessorTracker 状态 + */ + public void init(String ptIP) { + this.ip = ptIP; this.lastActiveTime = System.currentTimeMillis(); this.remainTaskNum = 0; this.dispatched = false; + this.connected = false; } + /** + * 接收到 ProcessorTracker 的心跳信息后,更新状态 + * @param req ProcessorTracker的心跳信息 + */ public void update(ProcessorTrackerStatusReportReq req) { // 延迟到达的请求,直接忽略 @@ -44,6 +54,7 @@ public class ProcessorTrackerStatus { this.lastActiveTime = req.getTime(); this.remainTaskNum = req.getRemainTaskNum(); this.dispatched = true; + this.connected = true; } /** @@ -56,6 +67,11 @@ public class ProcessorTrackerStatus { return true; } + // 已派发但未收到响应,则不可用 + if (!connected) { + return false; + } + // 长时间未收到心跳消息,则不可用 if (isTimeout()) { return false; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java index 62766936..d131b60e 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java @@ -4,7 +4,7 @@ import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.ThreadLocalStore; -import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.persistence.TaskDO; @@ -57,7 +57,7 @@ public abstract class MapReduceProcessor implements BasicProcessor { // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) boolean requestSucceed = false; try { - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(task.getAddress(), AkkaConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); CompletionStage requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS)); AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java index c95bd5e0..e36e0539 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java @@ -6,7 +6,7 @@ import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.OhMyConfig; -import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.NetUtils; import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; @@ -36,7 +36,7 @@ public class ProcessorTrackerTest { worker.init(); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); remoteProcessorTracker = testAS.actorSelection(akkaRemotePath); } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java index 32246303..37fc8b33 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java @@ -2,7 +2,7 @@ package com.github.kfcfans.oms; import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import com.github.kfcfans.common.AkkaConstant; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.common.request.ServerScheduleJobReq; @@ -34,7 +34,7 @@ public class TaskTrackerTest { worker.init(); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), AkkaConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), RemoteConstant.Task_TRACKER_ACTOR_NAME); remoteTaskTracker = testAS.actorSelection(akkaRemotePath); } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java index 66d44f28..885d9188 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestMapReduceProcessor.java @@ -37,7 +37,7 @@ public class TestMapReduceProcessor extends MapReduceProcessor { if (isRootTask()) { System.out.println("start to map"); List subTasks = Lists.newLinkedList(); - for (int j = 0; j < 1; j++) { + for (int j = 0; j < 2; j++) { for (int i = 0; i < 100; i++) { int x = j * 100 + i; subTasks.add(new TestSubTask("name" + x, x)); @@ -49,7 +49,7 @@ public class TestMapReduceProcessor extends MapReduceProcessor { return new ProcessResult(true, "MAP_SUCCESS"); }else { System.out.println("start to process"); -// Thread.sleep(1000); + Thread.sleep(1000); System.out.println(context.getSubTask()); if (context.getCurrentRetryTimes() == 0) { return new ProcessResult(false, "FIRST_FAILED"); diff --git a/oh-my-scheduler-worker/src/test/resources/logback-test.xml b/oh-my-scheduler-worker/src/test/resources/logback-test.xml index 5b7d14ef..4159de7a 100644 --- a/oh-my-scheduler-worker/src/test/resources/logback-test.xml +++ b/oh-my-scheduler-worker/src/test/resources/logback-test.xml @@ -5,7 +5,8 @@ - %red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) %green([%thread]) - %cyan(%msg%n) + + %red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) - %cyan(%msg%n) UTF-8