From e99292f02719c5f7a08de545668d28aa80ad18d7 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 9 Apr 2020 15:20:06 +0800 Subject: [PATCH] finished FrequentTaskTracker and pass the test~ --- .../common/request/ServerScheduleJobReq.java | 2 +- .../oms/worker/actors/TaskTrackerActor.java | 2 +- .../core/tracker/task/CommonTaskTracker.java | 30 +-- .../tracker/task/FrequentTaskTracker.java | 240 +++++++++++++++--- .../worker/core/tracker/task/TaskTracker.java | 127 ++++++--- .../core/tracker/task/TaskTrackerPool.java | 2 +- .../persistence/TaskPersistenceService.java | 19 +- .../kfcfans/oms/worker/sdk/TaskContext.java | 1 - ...erTest.java => CommonTaskTrackerTest.java} | 43 +--- .../kfcfans/oms/FrequentTaskTrackerTest.java | 53 ++++ .../com/github/kfcfans/oms/TestUtils.java | 58 +++++ .../oms/processors/TestBasicProcessor.java | 16 +- 12 files changed, 446 insertions(+), 147 deletions(-) rename oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/{TaskTrackerTest.java => CommonTaskTrackerTest.java} (50%) create mode 100644 oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/FrequentTaskTrackerTest.java create mode 100644 oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TestUtils.java diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java index bb965392..8a350143 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java @@ -62,7 +62,7 @@ public class ServerScheduleJobReq implements Serializable { */ // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) private String timeExpressionType; - // 时间表达式,CRON/NULL/LONG/LONG + // 时间表达式,CRON/NULL/LONG/LONG(单位MS) private String timeExpression; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index 51b27eac..a5bdc954 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -115,7 +115,7 @@ public class TaskTrackerActor extends AbstractActor { } // 原子创建,防止多实例的存在 - TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> new CommonTaskTracker(req)); + TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> TaskTracker.create(req)); } /** diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java index 0a45ec46..fd9e7915 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java @@ -39,7 +39,7 @@ public class CommonTaskTracker extends TaskTracker { // 可以是除 ROOT_TASK_ID 的任何数字 private static final String LAST_TASK_ID = "1111"; - public CommonTaskTracker(ServerScheduleJobReq req) { + protected CommonTaskTracker(ServerScheduleJobReq req) { super(req); } @@ -98,29 +98,19 @@ public class CommonTaskTracker extends TaskTracker { private void innerRun() { - Long instanceId = instanceInfo.getInstanceId(); + InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); - // 1. 查询统计信息 - Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId); + long finishedNum = holder.succeedNum + holder.failedNum; + long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum; - long waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L); - long workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L); - long receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L); - long runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L); - long failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L); - long succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L); - - long finishedNum = succeedNum + failedNum; - long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum; - - log.debug("[TaskTracker-{}] status check result: {}", instanceId, status2Num); + log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder); TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); req.setJobId(instanceInfo.getJobId()); req.setInstanceId(instanceId); req.setTotalTaskNum(finishedNum + unfinishedNum); - req.setSucceedTaskNum(succeedNum); - req.setFailedTaskNum(failedNum); + req.setSucceedTaskNum(holder.succeedNum); + req.setFailedTaskNum(holder.failedNum); req.setReportTime(System.currentTimeMillis()); // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果 @@ -141,7 +131,7 @@ public class CommonTaskTracker extends TaskTracker { // STANDALONE 只有一个任务,完成即结束 if (executeType == ExecuteType.STANDALONE) { - List allTask = taskPersistenceService.getAllTask(instanceId); + List allTask = taskPersistenceService.getAllTask(instanceId, instanceId); if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) { log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId); }else { @@ -151,7 +141,7 @@ public class CommonTaskTracker extends TaskTracker { } else { // MapReduce 和 Broadcast 任务实例是否完成根据**Last_Task**的执行情况判断 - Optional lastTaskOptional = taskPersistenceService.getLastTask(instanceId); + Optional lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId); if (lastTaskOptional.isPresent()) { // 存在则根据 reduce 任务来判断状态 @@ -216,7 +206,7 @@ public class CommonTaskTracker extends TaskTracker { // 5.1 定期检查 -> 重试派发后未确认的任务 long currentMS = System.currentTimeMillis(); - if (workerUnreceivedNum != 0) { + if (holder.workerUnreceivedNum != 0) { taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> { long elapsedTime = currentMS - uncheckTask.getLastModifiedTime(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index 762f0531..9e8ef59a 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -1,22 +1,28 @@ package com.github.kfcfans.oms.worker.core.tracker.task; +import akka.actor.ActorSelection; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.InstanceStatus; -import com.github.kfcfans.common.ProcessorType; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.common.request.ServerScheduleJobReq; +import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.LRUCache; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; -import javax.annotation.Nullable; +import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -24,6 +30,8 @@ import java.util.concurrent.atomic.AtomicLong; /** * 处理秒级任务(FIX_RATE/FIX_DELAY)的TaskTracker + * FIX_RATE 直接由 ScheduledExecutorService 实现,精度高,推荐使用 + * FIX_DELAY 会有几秒的延迟,精度不是很理想 * * @author tjq * @since 2020/4/8 @@ -31,66 +39,90 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class FrequentTaskTracker extends TaskTracker { + // 时间表达式类型 + private TimeExpressionType timeExpressionType; + private long timeParams; + // 总运行次数(正常情况不会出现锁竞争,直接用 Atomic 系列,锁竞争验证推荐 LongAdder) - private final AtomicLong triggerTimes = new AtomicLong(0); + private AtomicLong triggerTimes; + private AtomicLong succeedTimes; + private AtomicLong failedTimes; + + // 任务发射器 + private Launcher launcher; // 保存最近10个子任务的信息,供用户查询(user -> server -> worker 传递查询) - private final LRUCache recentSubInstanceInfo = new LRUCache<>(HISTORY_SIZE); + private LRUCache recentSubInstanceInfo; // 保存运行中的任务 - private final Map subInstanceId2LastActiveTime = Maps.newConcurrentMap(); + private Map subInstanceId2TimeHolder; private static final int HISTORY_SIZE = 10; + private static final String LAST_TASK_ID_PREFIX = "L"; - public FrequentTaskTracker(ServerScheduleJobReq req) { + protected FrequentTaskTracker(ServerScheduleJobReq req) { super(req); } @Override protected void initTaskTracker(ServerScheduleJobReq req) { + // 0. 初始化实例变量 + timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); + timeParams = Long.parseLong(req.getTimeExpression()); + + triggerTimes = new AtomicLong(0); + succeedTimes = new AtomicLong(0); + failedTimes = new AtomicLong(0); + + recentSubInstanceInfo = new LRUCache<>(HISTORY_SIZE); + subInstanceId2TimeHolder = Maps.newConcurrentMap(); + // 1. 初始化定时调度线程池 ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build(); this.scheduledPool = Executors.newScheduledThreadPool(3, factory); // 2. 启动任务发射器 - Runnable launcher = new Launcher(); - long t = Long.parseLong(req.getTimeExpression()); - TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); + launcher = new Launcher(); if (timeExpressionType == TimeExpressionType.FIX_RATE) { - scheduledPool.scheduleAtFixedRate(launcher, 0, t, TimeUnit.SECONDS); + scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS); }else { - scheduledPool.scheduleWithFixedDelay(launcher, 0, t, TimeUnit.SECONDS); + scheduledPool.schedule(launcher, 0, TimeUnit.MILLISECONDS); } // 3. 启动任务分发器(事实上,秒级任务应该都是单机任务,且感觉不需要失败重试机制,那么 Dispatcher 的存在就有点浪费系统资源了...) scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS); + // 4. 启动状态检查器 + scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(timeParams, 10000), TimeUnit.MILLISECONDS); } - - @Override - public void updateTaskStatus(String taskId, int newStatus, @Nullable String result) { - - super.updateTaskStatus(taskId, newStatus, result); - } - /** * 任务发射器(@Reference 饥荒->雪球发射器) */ private class Launcher implements Runnable { - @Override - public void run() { + public void innerRun() { + // 子任务实例ID Long subInstanceId = triggerTimes.incrementAndGet(); - subInstanceId2LastActiveTime.put(subInstanceId, System.currentTimeMillis()); + + // 记录时间 + SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder(); + timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis(); + subInstanceId2TimeHolder.put(subInstanceId, timeHolder); + + // 执行记录缓存 + SubInstanceInfo subInstanceInfo = new SubInstanceInfo(); + subInstanceInfo.status = TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue(); + subInstanceInfo.startTime = timeHolder.startTime; + recentSubInstanceInfo.put(subInstanceId, subInstanceInfo); String myAddress = OhMyWorker.getWorkerAddress(); String taskId = String.valueOf(subInstanceId); TaskDO newRootTask = new TaskDO(); + newRootTask.setInstanceId(instanceId); newRootTask.setSubInstanceId(subInstanceId); - newRootTask.setInstanceId(instanceInfo.getInstanceId()); newRootTask.setTaskId(taskId); newRootTask.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue()); @@ -101,52 +133,180 @@ public class FrequentTaskTracker extends TaskTracker { newRootTask.setCreatedTime(System.currentTimeMillis()); newRootTask.setLastModifiedTime(System.currentTimeMillis()); - // 秒级任务要求精确,先运行再说~ - dispatchTask(newRootTask, myAddress); - - // 持久化 + // 必须先持久化,持久化成功才能 Dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错) if (!taskPersistenceService.save(newRootTask)) { log.error("[TaskTracker-{}] Launcher create new root task failed.", instanceId); - }else { - log.debug("[TaskTracker-{}] Launcher create new root task successfully.", instanceId); + processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED"); + return; + } + + dispatchTask(newRootTask, myAddress); + } + + @Override + public void run() { + try { + innerRun(); + }catch (Exception e) { + log.error("[TaskTracker-{}] launch task failed.", instanceId, e); } } } + /** + * 检查各个SubInstance的完成情况 + */ private class Checker implements Runnable { + private static final long HEARTBEAT_TIMEOUT_MS = 60000; + @Override public void run() { + try { + checkStatus(); + reportStatus(); + }catch (Exception e) { + log.warn("[TaskTracker-{}] check and report status failed.", instanceId, e); + } + } + + private void checkStatus() { Stopwatch stopwatch = Stopwatch.createStarted(); ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); long instanceTimeoutMS = instanceInfo.getInstanceTimeoutMS(); long nowTS = System.currentTimeMillis(); - subInstanceId2LastActiveTime.forEach((subInstanceId, lastActiveTime) -> { + Iterator> iterator = subInstanceId2TimeHolder.entrySet().iterator(); + while (iterator.hasNext()) { - long timeout = nowTS - lastActiveTime; + Map.Entry entry = iterator.next(); + Long subInstanceId = entry.getKey(); + SubInstanceTimeHolder timeHolder = entry.getValue(); - // 超时,直接判定为失败 - if (timeout > instanceTimeoutMS) { + long executeTimeout = nowTS - timeHolder.startTime; + long heartbeatTimeout = nowTS - timeHolder.lastActiveTime; - // 更新缓存数据 - if (recentSubInstanceInfo.containsKey(subInstanceId)) { - SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId); - subInstanceInfo.status = InstanceStatus.FAILED.getV(); - subInstanceInfo.result = "TIMEOUT"; - } + // 超时(包含总运行时间超时和心跳包超时),直接判定为失败 + if (executeTimeout > instanceTimeoutMS || heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) { - // 删除数据库相关数据 + onFinished(subInstanceId, false, "TIMEOUT", iterator); + continue; } - }); + // 查看执行情况 + InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId); + + long finishedNum = holder.succeedNum + holder.failedNum; + long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum; + + if (unfinishedNum == 0) { + + // 数据库中没有该 subInstanceId 的记录,说明任务发射器写入DB失败,直接视为执行失败,删除数据 + if (finishedNum == 0) { + onFinished(subInstanceId, false, "LAUNCH_FAILED", iterator); + continue; + } + + // STANDALONE 代表任务确实已经执行完毕了 + if (executeType == ExecuteType.STANDALONE) { + + // 查询数据库获取结果(STANDALONE每个SubInstance只会有一条Task记录) + String result = taskPersistenceService.getAllTask(instanceId, subInstanceId).get(0).getResult(); + onFinished(subInstanceId, true, result, iterator); + continue; + } + + // MapReduce 和 BroadCast 需要根据是否有 LAST_TASK 来判断结束与否 + Optional lastTaskOptional = taskPersistenceService.getLastTask(instanceId, subInstanceId); + if (lastTaskOptional.isPresent()) { + + TaskStatus lastTaskStatus = TaskStatus.of(lastTaskOptional.get().getStatus()); + if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) { + onFinished(subInstanceId, lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS, lastTaskOptional.get().getResult(), iterator); + } + }else { + + // 创建最终任务并提交执行 + TaskDO newLastTask = new TaskDO(); + newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); + newLastTask.setTaskId(LAST_TASK_ID_PREFIX + subInstanceId); + newLastTask.setSubInstanceId(subInstanceId); + newLastTask.setAddress(OhMyWorker.getWorkerAddress()); + submitTask(Lists.newArrayList(newLastTask)); + } + } + + // 舍去一切重试机制,反正超时就失败 + + log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch.stop()); + } + } + + private void reportStatus() { + + if (StringUtils.isEmpty(OhMyWorker.getCurrentServer())) { + return; + } + + TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); + req.setJobId(instanceInfo.getJobId()); + req.setInstanceId(instanceId); + req.setReportTime(System.currentTimeMillis()); + req.setInstanceStatus(InstanceStatus.RUNNING.getV()); + + req.setTotalTaskNum(triggerTimes.get()); + req.setSucceedTaskNum(succeedTimes.get()); + req.setFailedTaskNum(failedTimes.get()); + req.setReportTime(System.currentTimeMillis()); + + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + + // 非可靠通知,Server挂掉后任务的kill工作交由其他线程去做 + serverActor.tell(req, null); + } + + /** + * 处理任务完成的情况,删除内存 & 数据库数据 + */ + private void onFinished(Long subInstanceId, boolean success, String result, Iterator iterator) { + iterator.remove(); + processFinishedSubInstance(subInstanceId, success, result); + } + } + + private void processFinishedSubInstance(long subInstanceId, boolean success, String result) { + if (success) { + succeedTimes.incrementAndGet(); + } else { + failedTimes.incrementAndGet(); + } + + // 更新缓存数据 + if (recentSubInstanceInfo.containsKey(subInstanceId)) { + SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId); + subInstanceInfo.status = success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV(); + subInstanceInfo.result = result; + } + // 删除数据库相关数据 + taskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId); + + // FIX_DELAY 则调度下次任务 + if (timeExpressionType == TimeExpressionType.FIX_DELAY) { + scheduledPool.schedule(launcher, timeParams, TimeUnit.MILLISECONDS); } } private static class SubInstanceInfo { private int status; + private long startTime; private String result; } + private static class SubInstanceTimeHolder { + private long startTime; + private long lastActiveTime; + } + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index cd4b34f3..9b1cae26 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker.core.tracker.task; import akka.actor.ActorSelection; import com.github.kfcfans.common.RemoteConstant; +import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.OhMyWorker; @@ -17,6 +18,7 @@ import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.CollectionUtils; @@ -24,6 +26,7 @@ import org.springframework.util.StringUtils; import javax.annotation.Nullable; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,7 +56,7 @@ public abstract class TaskTracker { // 是否结束 protected AtomicBoolean finished = new AtomicBoolean(false); - public TaskTracker(ServerScheduleJobReq req) { + protected TaskTracker(ServerScheduleJobReq req) { // 初始化成员变量 this.createTime = System.currentTimeMillis(); @@ -69,6 +72,20 @@ public abstract class TaskTracker { log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", req.getInstanceId(), req); } + /** + * 静态方法创建 TaskTracker + * @param req 服务端调度任务请求 + * @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker + */ + public static TaskTracker create(ServerScheduleJobReq req) { + TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); + switch (timeExpressionType) { + case FIX_RATE: + case FIX_DELAY:return new FrequentTaskTracker(req); + default:return new CommonTaskTracker(req); + } + } + /* *************************** 对外方法区 *************************** */ /** * 更新Task状态(任务状态机限定只允许状态变量递增,eg. 允许 FAILED -> SUCCEED,但不允许 SUCCEED -> FAILED) @@ -191,8 +208,53 @@ public abstract class TaskTracker { updateTaskStatus(preTaskId, status, result); } + /** + * 销毁自身,释放资源 + */ + public void destroy() { + + // 0. 开始关闭线程池,不能使用 shutdownNow(),因为 destroy 方法本身就在 scheduledPool 的线程中执行,强行关闭会打断 destroy 的执行。 + scheduledPool.shutdown(); + + // 1. 通知 ProcessorTracker 释放资源 + Long instanceId = instanceInfo.getInstanceId(); + TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); + stopRequest.setInstanceId(instanceId); + ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { + String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath); + // 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 + ptActor.tell(stopRequest, null); + }); + + // 2. 删除所有数据库数据 + boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId); + if (!dbSuccess) { + log.warn("[TaskTracker-{}] delete tasks from database failed.", instanceId); + taskPersistenceService.deleteAllTasks(instanceId); + }else { + log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId); + } + + // 3. 移除顶层引用,送去 GC + TaskTrackerPool.remove(instanceId); + + log.info("[TaskTracker-{}] TaskTracker has left the world, bye~", instanceId); + + // 4. 强制关闭线程池 + if (!scheduledPool.isTerminated()) { + CommonUtils.executeIgnoreException(() -> scheduledPool.shutdownNow()); + } + + } + /* *************************** 对内方法区 *************************** */ + /** + * 派发任务到 ProcessorTracker + * @param task 需要被执行的任务 + * @param processorTrackerAddress ProcessorTracker的地址(IP:Port) + */ protected void dispatchTask(TaskDO task, String processorTrackerAddress) { TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task); @@ -212,42 +274,26 @@ public abstract class TaskTracker { } /** - * 销毁自身,释放资源 + * 获取任务实例产生的各个Task状态,用于分析任务实例执行情况 + * @param subInstanceId 子任务实例ID + * @return InstanceStatisticsHolder */ - protected void destroy() { + protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) { - // 0. 先关闭定时任务线程池,防止任务被派发出去 - CommonUtils.executeIgnoreException(() -> { - // 不能使用 shutdownNow(),因为 destroy 方法本身就在 scheduledPool 的线程中执行,强行关闭会打断 destroy 的执行。 - scheduledPool.shutdown(); - return null; - }); + Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId); + InstanceStatisticsHolder holder = new InstanceStatisticsHolder(); - // 1. 通知 ProcessorTracker 释放资源 - Long instanceId = instanceInfo.getInstanceId(); - TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); - stopRequest.setInstanceId(instanceId); - ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { - String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath); - // 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 - ptActor.tell(stopRequest, null); - }); - - // 2. 删除所有数据库数据 - boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId); - if (!dbSuccess) { - log.warn("[TaskTracker-{}] delete tasks from database failed.", instanceId); - }else { - log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId); - } - - // 3. 移除顶层引用,送去 GC - TaskTrackerPool.remove(instanceId); - - log.info("[TaskTracker-{}] TaskTracker has left the world, bye~", instanceId); + holder.waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L); + holder.workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L); + holder.receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L); + holder.runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L); + holder.failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L); + holder.succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L); + return holder; } + + /** * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 */ @@ -307,6 +353,23 @@ public abstract class TaskTracker { } } + /** + * 存储任务实例产生的各个Task状态,用于分析任务实例执行情况 + */ + @Data + protected static class InstanceStatisticsHolder { + // 等待派发状态(仅存在 TaskTracker 数据库中) + protected long waitingDispatchNum; + // 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行 + protected long workerUnreceivedNum; + // ProcessorTracker确认接收,存在与线程池队列中,排队执行 + protected long receivedNum; + // ProcessorTracker正在执行 + protected long runningNum; + protected long failedNum; + protected long succeedNum; + } + /** * 初始化 TaskTracker * @param req 服务器调度任务实例运行请求 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java index d5c6313b..bc397be9 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTrackerPool.java @@ -16,7 +16,7 @@ public class TaskTrackerPool { private static final Map instanceId2TaskTracker = Maps.newConcurrentMap(); /** - * 获取 ProcessorTracker,如果不存在则创建 + * 获取 TaskTracker */ public static TaskTracker getTaskTrackerPool(Long instanceId) { return instanceId2TaskTracker.get(instanceId); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index 782c6c92..c55130f8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -62,7 +62,7 @@ public class TaskPersistenceService { try { return execute(() -> taskDAO.batchSave(tasks)); }catch (Exception e) { - log.error("[TaskPersistenceService] batchSave tasks failed.", e); + log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e); } return false; } @@ -110,11 +110,12 @@ public class TaskPersistenceService { /** * 获取 MapReduce 或 Broadcast 的最后一个任务 */ - public Optional getLastTask(Long instanceId) { + public Optional getLastTask(Long instanceId, Long subInstanceId) { try { SimpleTaskQuery query = new SimpleTaskQuery(); query.setInstanceId(instanceId); + query.setSubInstanceId(subInstanceId); query.setTaskName(TaskConstant.LAST_TASK_NAME); return execute(() -> { List taskDOS = taskDAO.simpleQuery(query); @@ -130,13 +131,12 @@ public class TaskPersistenceService { return Optional.empty(); } - public List getAllTask(Long instanceId) { + public List getAllTask(Long instanceId, Long subInstanceId) { try { SimpleTaskQuery query = new SimpleTaskQuery(); query.setInstanceId(instanceId); - return execute(() -> { - return taskDAO.simpleQuery(query); - }); + query.setSubInstanceId(subInstanceId); + return execute(() -> taskDAO.simpleQuery(query)); }catch (Exception e) { log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e); } @@ -163,11 +163,12 @@ public class TaskPersistenceService { * 获取 TaskTracker 管理的子 task 状态统计信息 * TaskStatus -> num */ - public Map getTaskStatusStatistics(Long instanceId) { + public Map getTaskStatusStatistics(Long instanceId, Long subInstanceId) { try { SimpleTaskQuery query = new SimpleTaskQuery(); query.setInstanceId(instanceId); + query.setSubInstanceId(subInstanceId); query.setQueryContent("status, count(*) as num"); query.setOtherCondition("GROUP BY status"); @@ -201,7 +202,7 @@ public class TaskPersistenceService { } /** - * 查询任务状态(只查询 status,节约 I/O 资源 -> 测试表明,效果惊人...磁盘I/O果然是重要瓶颈...) + * 查询任务状态(只查询 status,节约 I/O 资源 -> 测试表明,在(我高端的NVMe)SSD上都效果惊人...别说一般的HDD了...磁盘I/O果然是重要瓶颈...) */ public Optional getTaskStatus(Long instanceId, String taskId) { @@ -277,7 +278,7 @@ public class TaskPersistenceService { try { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); - + condition.setSubInstanceId(subInstanceId); return execute(() -> taskDAO.simpleDelete(condition)); }catch (Exception e) { log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java index d5faf1de..5def3831 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java @@ -1,6 +1,5 @@ package com.github.kfcfans.oms.worker.sdk; -import lombok.Data; import lombok.Getter; import lombok.Setter; diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java similarity index 50% rename from oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java rename to oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java index eee04ded..97957db0 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java @@ -4,8 +4,7 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.ExecuteType; -import com.github.kfcfans.common.ProcessorType; -import com.github.kfcfans.common.request.ServerScheduleJobReq; +import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; @@ -21,7 +20,7 @@ import org.junit.jupiter.api.Test; * @author tjq * @since 2020/3/25 */ -public class TaskTrackerTest { +public class CommonTaskTrackerTest { private static ActorSelection remoteTaskTracker; @@ -48,48 +47,14 @@ public class TaskTrackerTest { @Test public void testStandaloneJob() throws Exception { - remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.STANDALONE), null); + remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.CRON), null); Thread.sleep(5000000); } @Test public void testMapReduceJob() throws Exception { - remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.MAP_REDUCE), null); + remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.MAP_REDUCE, TimeExpressionType.CRON), null); Thread.sleep(5000000); } - private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) { - ServerScheduleJobReq req = new ServerScheduleJobReq(); - - req.setJobId(1L); - req.setInstanceId(10086L); - req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT)); - - req.setJobParams("this is job Params"); - req.setInstanceParams("this is instance Params"); - req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); - req.setTaskRetryNum(3); - req.setThreadConcurrency(20); - req.setInstanceTimeoutMS(500000); - req.setTaskTimeoutMS(500000); - - switch (executeType) { - case STANDALONE: - req.setExecuteType(ExecuteType.STANDALONE.name()); - req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor"); - break; - case MAP_REDUCE: - req.setExecuteType(ExecuteType.MAP_REDUCE.name()); - req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor"); - break; - case BROADCAST: - req.setExecuteType(ExecuteType.BROADCAST.name()); - req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor"); - break; - } - - return req; - } - - } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/FrequentTaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/FrequentTaskTrackerTest.java new file mode 100644 index 00000000..c378a27a --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/FrequentTaskTrackerTest.java @@ -0,0 +1,53 @@ +package com.github.kfcfans.oms; + +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.common.RemoteConstant; +import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.common.utils.NetUtils; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.OhMyConfig; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; +import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * description + * + * @author tjq + * @since 2020/4/9 + */ +public class FrequentTaskTrackerTest { + + private static ActorSelection remoteTaskTracker; + + @BeforeAll + public static void init() throws Exception { + + OhMyConfig ohMyConfig = new OhMyConfig(); + ohMyConfig.setAppName("oms-test"); + ohMyConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700")); + OhMyWorker worker = new OhMyWorker(); + worker.setConfig(ohMyConfig); + worker.init(); + + ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME); + remoteTaskTracker = testAS.actorSelection(akkaRemotePath); + } + + @Test + public void testFixRateJob() throws Exception { + remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.FIX_RATE), null); + Thread.sleep(5000000); + } + + @Test + public void testFixDelayJob() throws Exception { + remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.MAP_REDUCE, TimeExpressionType.FIX_DELAY), null); + Thread.sleep(5000000); + } +} diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TestUtils.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TestUtils.java new file mode 100644 index 00000000..0fae00f3 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TestUtils.java @@ -0,0 +1,58 @@ +package com.github.kfcfans.oms; + +import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.common.ProcessorType; +import com.github.kfcfans.common.RemoteConstant; +import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.common.request.ServerScheduleJobReq; +import com.github.kfcfans.common.utils.NetUtils; +import com.google.common.collect.Lists; + +/** + * 测试需要用到的工具类 + * + * @author tjq + * @since 2020/4/9 + */ +public class TestUtils { + + public static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType, TimeExpressionType timeExpressionType) { + ServerScheduleJobReq req = new ServerScheduleJobReq(); + + req.setJobId(1L); + req.setInstanceId(10086L); + req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT)); + + req.setJobParams("JobParams"); + req.setInstanceParams("InstanceParams"); + req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name()); + req.setTaskRetryNum(3); + req.setThreadConcurrency(10); + req.setInstanceTimeoutMS(500000); + req.setTaskTimeoutMS(500000); + req.setTimeExpressionType(timeExpressionType.name()); + switch (timeExpressionType) { + case CRON:req.setTimeExpression("0 * * * * ? "); + case FIX_RATE: + case FIX_DELAY:req.setTimeExpression("5000"); + } + + switch (executeType) { + case STANDALONE: + req.setExecuteType(ExecuteType.STANDALONE.name()); + req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor"); + break; + case MAP_REDUCE: + req.setExecuteType(ExecuteType.MAP_REDUCE.name()); + req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor"); + break; + case BROADCAST: + req.setExecuteType(ExecuteType.BROADCAST.name()); + req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor"); + break; + } + + return req; + } + +} diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java index 55d24878..63b1442a 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBasicProcessor.java @@ -1,8 +1,16 @@ package com.github.kfcfans.oms.processors; +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.common.ProcessorType; +import com.github.kfcfans.common.RemoteConstant; +import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.common.request.ServerScheduleJobReq; +import com.github.kfcfans.common.utils.NetUtils; import com.github.kfcfans.oms.worker.sdk.ProcessResult; import com.github.kfcfans.oms.worker.sdk.TaskContext; import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor; +import com.google.common.collect.Lists; /** * 测试用的基础处理器 @@ -14,8 +22,10 @@ public class TestBasicProcessor implements BasicProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { - System.out.println("==== ProcessResult#process"); - System.out.println("TaskContext: " + context.toString()); - return new ProcessResult(true, "success"); + System.out.println("======== BasicProcessor#process ========"); + System.out.println("TaskContext: " + JSONObject.toJSONString(context) + ";time = " + System.currentTimeMillis()); + return new ProcessResult(true, System.currentTimeMillis() + "success"); } + + }