finished FrequentTaskTracker and pass the test~

This commit is contained in:
tjq 2020-04-09 15:20:06 +08:00
parent 43cc6360e8
commit e99292f027
12 changed files with 446 additions and 147 deletions

View File

@ -62,7 +62,7 @@ public class ServerScheduleJobReq implements Serializable {
*/
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private String timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
// 时间表达式CRON/NULL/LONG/LONG单位MS
private String timeExpression;
}

View File

@ -115,7 +115,7 @@ public class TaskTrackerActor extends AbstractActor {
}
// 原子创建防止多实例的存在
TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> new CommonTaskTracker(req));
TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> TaskTracker.create(req));
}
/**

View File

@ -39,7 +39,7 @@ public class CommonTaskTracker extends TaskTracker {
// 可以是除 ROOT_TASK_ID 的任何数字
private static final String LAST_TASK_ID = "1111";
public CommonTaskTracker(ServerScheduleJobReq req) {
protected CommonTaskTracker(ServerScheduleJobReq req) {
super(req);
}
@ -98,29 +98,19 @@ public class CommonTaskTracker extends TaskTracker {
private void innerRun() {
Long instanceId = instanceInfo.getInstanceId();
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
// 1. 查询统计信息
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId);
long finishedNum = holder.succeedNum + holder.failedNum;
long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;
long waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L);
long workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L);
long receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L);
long runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L);
long failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L);
long succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L);
long finishedNum = succeedNum + failedNum;
long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum;
log.debug("[TaskTracker-{}] status check result: {}", instanceId, status2Num);
log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
req.setJobId(instanceInfo.getJobId());
req.setInstanceId(instanceId);
req.setTotalTaskNum(finishedNum + unfinishedNum);
req.setSucceedTaskNum(succeedNum);
req.setFailedTaskNum(failedNum);
req.setSucceedTaskNum(holder.succeedNum);
req.setFailedTaskNum(holder.failedNum);
req.setReportTime(System.currentTimeMillis());
// 2. 如果未完成任务数为0判断是否真正结束并获取真正结束任务的执行结果
@ -141,7 +131,7 @@ public class CommonTaskTracker extends TaskTracker {
// STANDALONE 只有一个任务完成即结束
if (executeType == ExecuteType.STANDALONE) {
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId);
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
}else {
@ -151,7 +141,7 @@ public class CommonTaskTracker extends TaskTracker {
} else {
// MapReduce Broadcast 任务实例是否完成根据**Last_Task**的执行情况判断
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId);
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
if (lastTaskOptional.isPresent()) {
// 存在则根据 reduce 任务来判断状态
@ -216,7 +206,7 @@ public class CommonTaskTracker extends TaskTracker {
// 5.1 定期检查 -> 重试派发后未确认的任务
long currentMS = System.currentTimeMillis();
if (workerUnreceivedNum != 0) {
if (holder.workerUnreceivedNum != 0) {
taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();

View File

@ -1,22 +1,28 @@
package com.github.kfcfans.oms.worker.core.tracker.task;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.LRUCache;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -24,6 +30,8 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* 处理秒级任务FIX_RATE/FIX_DELAY的TaskTracker
* FIX_RATE 直接由 ScheduledExecutorService 实现精度高推荐使用
* FIX_DELAY 会有几秒的延迟精度不是很理想
*
* @author tjq
* @since 2020/4/8
@ -31,66 +39,90 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class FrequentTaskTracker extends TaskTracker {
// 时间表达式类型
private TimeExpressionType timeExpressionType;
private long timeParams;
// 总运行次数正常情况不会出现锁竞争直接用 Atomic 系列锁竞争验证推荐 LongAdder
private final AtomicLong triggerTimes = new AtomicLong(0);
private AtomicLong triggerTimes;
private AtomicLong succeedTimes;
private AtomicLong failedTimes;
// 任务发射器
private Launcher launcher;
// 保存最近10个子任务的信息供用户查询user -> server -> worker 传递查询
private final LRUCache<Long, SubInstanceInfo> recentSubInstanceInfo = new LRUCache<>(HISTORY_SIZE);
private LRUCache<Long, SubInstanceInfo> recentSubInstanceInfo;
// 保存运行中的任务
private final Map<Long, Long> subInstanceId2LastActiveTime = Maps.newConcurrentMap();
private Map<Long, SubInstanceTimeHolder> subInstanceId2TimeHolder;
private static final int HISTORY_SIZE = 10;
private static final String LAST_TASK_ID_PREFIX = "L";
public FrequentTaskTracker(ServerScheduleJobReq req) {
protected FrequentTaskTracker(ServerScheduleJobReq req) {
super(req);
}
@Override
protected void initTaskTracker(ServerScheduleJobReq req) {
// 0. 初始化实例变量
timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
timeParams = Long.parseLong(req.getTimeExpression());
triggerTimes = new AtomicLong(0);
succeedTimes = new AtomicLong(0);
failedTimes = new AtomicLong(0);
recentSubInstanceInfo = new LRUCache<>(HISTORY_SIZE);
subInstanceId2TimeHolder = Maps.newConcurrentMap();
// 1. 初始化定时调度线程池
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build();
this.scheduledPool = Executors.newScheduledThreadPool(3, factory);
// 2. 启动任务发射器
Runnable launcher = new Launcher();
long t = Long.parseLong(req.getTimeExpression());
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
launcher = new Launcher();
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
scheduledPool.scheduleAtFixedRate(launcher, 0, t, TimeUnit.SECONDS);
scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS);
}else {
scheduledPool.scheduleWithFixedDelay(launcher, 0, t, TimeUnit.SECONDS);
scheduledPool.schedule(launcher, 0, TimeUnit.MILLISECONDS);
}
// 3. 启动任务分发器事实上秒级任务应该都是单机任务且感觉不需要失败重试机制那么 Dispatcher 的存在就有点浪费系统资源了...
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS);
// 4. 启动状态检查器
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(timeParams, 10000), TimeUnit.MILLISECONDS);
}
@Override
public void updateTaskStatus(String taskId, int newStatus, @Nullable String result) {
super.updateTaskStatus(taskId, newStatus, result);
}
/**
* 任务发射器@Reference 饥荒->雪球发射器
*/
private class Launcher implements Runnable {
@Override
public void run() {
public void innerRun() {
// 子任务实例ID
Long subInstanceId = triggerTimes.incrementAndGet();
subInstanceId2LastActiveTime.put(subInstanceId, System.currentTimeMillis());
// 记录时间
SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder();
timeHolder.startTime = timeHolder.lastActiveTime = System.currentTimeMillis();
subInstanceId2TimeHolder.put(subInstanceId, timeHolder);
// 执行记录缓存
SubInstanceInfo subInstanceInfo = new SubInstanceInfo();
subInstanceInfo.status = TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue();
subInstanceInfo.startTime = timeHolder.startTime;
recentSubInstanceInfo.put(subInstanceId, subInstanceInfo);
String myAddress = OhMyWorker.getWorkerAddress();
String taskId = String.valueOf(subInstanceId);
TaskDO newRootTask = new TaskDO();
newRootTask.setInstanceId(instanceId);
newRootTask.setSubInstanceId(subInstanceId);
newRootTask.setInstanceId(instanceInfo.getInstanceId());
newRootTask.setTaskId(taskId);
newRootTask.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
@ -101,52 +133,180 @@ public class FrequentTaskTracker extends TaskTracker {
newRootTask.setCreatedTime(System.currentTimeMillis());
newRootTask.setLastModifiedTime(System.currentTimeMillis());
// 秒级任务要求精确先运行再说
dispatchTask(newRootTask, myAddress);
// 持久化
// 必须先持久化持久化成功才能 Dispatch否则会导致后续报错因为DB中没有这个taskId对应的记录会各种报错
if (!taskPersistenceService.save(newRootTask)) {
log.error("[TaskTracker-{}] Launcher create new root task failed.", instanceId);
}else {
log.debug("[TaskTracker-{}] Launcher create new root task successfully.", instanceId);
processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED");
return;
}
dispatchTask(newRootTask, myAddress);
}
@Override
public void run() {
try {
innerRun();
}catch (Exception e) {
log.error("[TaskTracker-{}] launch task failed.", instanceId, e);
}
}
}
/**
* 检查各个SubInstance的完成情况
*/
private class Checker implements Runnable {
private static final long HEARTBEAT_TIMEOUT_MS = 60000;
@Override
public void run() {
try {
checkStatus();
reportStatus();
}catch (Exception e) {
log.warn("[TaskTracker-{}] check and report status failed.", instanceId, e);
}
}
private void checkStatus() {
Stopwatch stopwatch = Stopwatch.createStarted();
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
long instanceTimeoutMS = instanceInfo.getInstanceTimeoutMS();
long nowTS = System.currentTimeMillis();
subInstanceId2LastActiveTime.forEach((subInstanceId, lastActiveTime) -> {
Iterator<Map.Entry<Long, SubInstanceTimeHolder>> iterator = subInstanceId2TimeHolder.entrySet().iterator();
while (iterator.hasNext()) {
long timeout = nowTS - lastActiveTime;
Map.Entry<Long, SubInstanceTimeHolder> entry = iterator.next();
Long subInstanceId = entry.getKey();
SubInstanceTimeHolder timeHolder = entry.getValue();
// 超时直接判定为失败
if (timeout > instanceTimeoutMS) {
long executeTimeout = nowTS - timeHolder.startTime;
long heartbeatTimeout = nowTS - timeHolder.lastActiveTime;
// 更新缓存数据
if (recentSubInstanceInfo.containsKey(subInstanceId)) {
SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId);
subInstanceInfo.status = InstanceStatus.FAILED.getV();
subInstanceInfo.result = "TIMEOUT";
}
// 超时包含总运行时间超时和心跳包超时直接判定为失败
if (executeTimeout > instanceTimeoutMS || heartbeatTimeout > HEARTBEAT_TIMEOUT_MS) {
// 删除数据库相关数据
onFinished(subInstanceId, false, "TIMEOUT", iterator);
continue;
}
});
// 查看执行情况
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId);
long finishedNum = holder.succeedNum + holder.failedNum;
long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;
if (unfinishedNum == 0) {
// 数据库中没有该 subInstanceId 的记录说明任务发射器写入DB失败直接视为执行失败删除数据
if (finishedNum == 0) {
onFinished(subInstanceId, false, "LAUNCH_FAILED", iterator);
continue;
}
// STANDALONE 代表任务确实已经执行完毕了
if (executeType == ExecuteType.STANDALONE) {
// 查询数据库获取结果STANDALONE每个SubInstance只会有一条Task记录
String result = taskPersistenceService.getAllTask(instanceId, subInstanceId).get(0).getResult();
onFinished(subInstanceId, true, result, iterator);
continue;
}
// MapReduce BroadCast 需要根据是否有 LAST_TASK 来判断结束与否
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, subInstanceId);
if (lastTaskOptional.isPresent()) {
TaskStatus lastTaskStatus = TaskStatus.of(lastTaskOptional.get().getStatus());
if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
onFinished(subInstanceId, lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS, lastTaskOptional.get().getResult(), iterator);
}
}else {
// 创建最终任务并提交执行
TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
newLastTask.setTaskId(LAST_TASK_ID_PREFIX + subInstanceId);
newLastTask.setSubInstanceId(subInstanceId);
newLastTask.setAddress(OhMyWorker.getWorkerAddress());
submitTask(Lists.newArrayList(newLastTask));
}
}
// 舍去一切重试机制反正超时就失败
log.debug("[TaskTracker-{}] check status using {}.", instanceId, stopwatch.stop());
}
}
private void reportStatus() {
if (StringUtils.isEmpty(OhMyWorker.getCurrentServer())) {
return;
}
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
req.setJobId(instanceInfo.getJobId());
req.setInstanceId(instanceId);
req.setReportTime(System.currentTimeMillis());
req.setInstanceStatus(InstanceStatus.RUNNING.getV());
req.setTotalTaskNum(triggerTimes.get());
req.setSucceedTaskNum(succeedTimes.get());
req.setFailedTaskNum(failedTimes.get());
req.setReportTime(System.currentTimeMillis());
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
// 非可靠通知Server挂掉后任务的kill工作交由其他线程去做
serverActor.tell(req, null);
}
/**
* 处理任务完成的情况删除内存 & 数据库数据
*/
private void onFinished(Long subInstanceId, boolean success, String result, Iterator<?> iterator) {
iterator.remove();
processFinishedSubInstance(subInstanceId, success, result);
}
}
private void processFinishedSubInstance(long subInstanceId, boolean success, String result) {
if (success) {
succeedTimes.incrementAndGet();
} else {
failedTimes.incrementAndGet();
}
// 更新缓存数据
if (recentSubInstanceInfo.containsKey(subInstanceId)) {
SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId);
subInstanceInfo.status = success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV();
subInstanceInfo.result = result;
}
// 删除数据库相关数据
taskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId);
// FIX_DELAY 则调度下次任务
if (timeExpressionType == TimeExpressionType.FIX_DELAY) {
scheduledPool.schedule(launcher, timeParams, TimeUnit.MILLISECONDS);
}
}
private static class SubInstanceInfo {
private int status;
private long startTime;
private String result;
}
private static class SubInstanceTimeHolder {
private long startTime;
private long lastActiveTime;
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker.core.tracker.task;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
@ -17,6 +18,7 @@ import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
@ -24,6 +26,7 @@ import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@ -53,7 +56,7 @@ public abstract class TaskTracker {
// 是否结束
protected AtomicBoolean finished = new AtomicBoolean(false);
public TaskTracker(ServerScheduleJobReq req) {
protected TaskTracker(ServerScheduleJobReq req) {
// 初始化成员变量
this.createTime = System.currentTimeMillis();
@ -69,6 +72,20 @@ public abstract class TaskTracker {
log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", req.getInstanceId(), req);
}
/**
* 静态方法创建 TaskTracker
* @param req 服务端调度任务请求
* @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker
*/
public static TaskTracker create(ServerScheduleJobReq req) {
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
switch (timeExpressionType) {
case FIX_RATE:
case FIX_DELAY:return new FrequentTaskTracker(req);
default:return new CommonTaskTracker(req);
}
}
/* *************************** 对外方法区 *************************** */
/**
* 更新Task状态任务状态机限定只允许状态变量递增eg. 允许 FAILED -> SUCCEED但不允许 SUCCEED -> FAILED
@ -191,8 +208,53 @@ public abstract class TaskTracker {
updateTaskStatus(preTaskId, status, result);
}
/**
* 销毁自身释放资源
*/
public void destroy() {
// 0. 开始关闭线程池不能使用 shutdownNow()因为 destroy 方法本身就在 scheduledPool 的线程中执行强行关闭会打断 destroy 的执行
scheduledPool.shutdown();
// 1. 通知 ProcessorTracker 释放资源
Long instanceId = instanceInfo.getInstanceId();
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath);
// 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
ptActor.tell(stopRequest, null);
});
// 2. 删除所有数据库数据
boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId);
if (!dbSuccess) {
log.warn("[TaskTracker-{}] delete tasks from database failed.", instanceId);
taskPersistenceService.deleteAllTasks(instanceId);
}else {
log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId);
}
// 3. 移除顶层引用送去 GC
TaskTrackerPool.remove(instanceId);
log.info("[TaskTracker-{}] TaskTracker has left the world, bye~", instanceId);
// 4. 强制关闭线程池
if (!scheduledPool.isTerminated()) {
CommonUtils.executeIgnoreException(() -> scheduledPool.shutdownNow());
}
}
/* *************************** 对内方法区 *************************** */
/**
* 派发任务到 ProcessorTracker
* @param task 需要被执行的任务
* @param processorTrackerAddress ProcessorTracker的地址IP:Port
*/
protected void dispatchTask(TaskDO task, String processorTrackerAddress) {
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task);
@ -212,42 +274,26 @@ public abstract class TaskTracker {
}
/**
* 销毁自身释放资源
* 获取任务实例产生的各个Task状态用于分析任务实例执行情况
* @param subInstanceId 子任务实例ID
* @return InstanceStatisticsHolder
*/
protected void destroy() {
protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) {
// 0. 先关闭定时任务线程池防止任务被派发出去
CommonUtils.executeIgnoreException(() -> {
// 不能使用 shutdownNow()因为 destroy 方法本身就在 scheduledPool 的线程中执行强行关闭会打断 destroy 的执行
scheduledPool.shutdown();
return null;
});
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId);
InstanceStatisticsHolder holder = new InstanceStatisticsHolder();
// 1. 通知 ProcessorTracker 释放资源
Long instanceId = instanceInfo.getInstanceId();
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath);
// 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
ptActor.tell(stopRequest, null);
});
// 2. 删除所有数据库数据
boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId);
if (!dbSuccess) {
log.warn("[TaskTracker-{}] delete tasks from database failed.", instanceId);
}else {
log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId);
}
// 3. 移除顶层引用送去 GC
TaskTrackerPool.remove(instanceId);
log.info("[TaskTracker-{}] TaskTracker has left the world, bye~", instanceId);
holder.waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L);
holder.workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L);
holder.receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L);
holder.runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L);
holder.failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L);
holder.succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L);
return holder;
}
/**
* 定时扫描数据库中的task出于内存占用量考虑每次最多获取100个并将需要执行的任务派发出去
*/
@ -307,6 +353,23 @@ public abstract class TaskTracker {
}
}
/**
* 存储任务实例产生的各个Task状态用于分析任务实例执行情况
*/
@Data
protected static class InstanceStatisticsHolder {
// 等待派发状态仅存在 TaskTracker 数据库中
protected long waitingDispatchNum;
// 已派发 ProcessorTracker 未确认可能由于网络错误请求未送达也有可能 ProcessorTracker 线程池满拒绝执行
protected long workerUnreceivedNum;
// ProcessorTracker确认接收存在与线程池队列中排队执行
protected long receivedNum;
// ProcessorTracker正在执行
protected long runningNum;
protected long failedNum;
protected long succeedNum;
}
/**
* 初始化 TaskTracker
* @param req 服务器调度任务实例运行请求

View File

@ -16,7 +16,7 @@ public class TaskTrackerPool {
private static final Map<Long, TaskTracker> instanceId2TaskTracker = Maps.newConcurrentMap();
/**
* 获取 ProcessorTracker如果不存在则创建
* 获取 TaskTracker
*/
public static TaskTracker getTaskTrackerPool(Long instanceId) {
return instanceId2TaskTracker.get(instanceId);

View File

@ -62,7 +62,7 @@ public class TaskPersistenceService {
try {
return execute(() -> taskDAO.batchSave(tasks));
}catch (Exception e) {
log.error("[TaskPersistenceService] batchSave tasks failed.", e);
log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e);
}
return false;
}
@ -110,11 +110,12 @@ public class TaskPersistenceService {
/**
* 获取 MapReduce Broadcast 的最后一个任务
*/
public Optional<TaskDO> getLastTask(Long instanceId) {
public Optional<TaskDO> getLastTask(Long instanceId, Long subInstanceId) {
try {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setSubInstanceId(subInstanceId);
query.setTaskName(TaskConstant.LAST_TASK_NAME);
return execute(() -> {
List<TaskDO> taskDOS = taskDAO.simpleQuery(query);
@ -130,13 +131,12 @@ public class TaskPersistenceService {
return Optional.empty();
}
public List<TaskDO> getAllTask(Long instanceId) {
public List<TaskDO> getAllTask(Long instanceId, Long subInstanceId) {
try {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
return execute(() -> {
return taskDAO.simpleQuery(query);
});
query.setSubInstanceId(subInstanceId);
return execute(() -> taskDAO.simpleQuery(query));
}catch (Exception e) {
log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e);
}
@ -163,11 +163,12 @@ public class TaskPersistenceService {
* 获取 TaskTracker 管理的子 task 状态统计信息
* TaskStatus -> num
*/
public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId) {
public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId) {
try {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setSubInstanceId(subInstanceId);
query.setQueryContent("status, count(*) as num");
query.setOtherCondition("GROUP BY status");
@ -201,7 +202,7 @@ public class TaskPersistenceService {
}
/**
* 查询任务状态只查询 status节约 I/O 资源 -> 测试表明效果惊人...磁盘I/O果然是重要瓶颈...
* 查询任务状态只查询 status节约 I/O 资源 -> 测试表明我高端的NVMeSSD上都效果惊人...别说一般的HDD了...磁盘I/O果然是重要瓶颈...
*/
public Optional<TaskStatus> getTaskStatus(Long instanceId, String taskId) {
@ -277,7 +278,7 @@ public class TaskPersistenceService {
try {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setSubInstanceId(subInstanceId);
return execute(() -> taskDAO.simpleDelete(condition));
}catch (Exception e) {
log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e);

View File

@ -1,6 +1,5 @@
package com.github.kfcfans.oms.worker.sdk;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;

View File

@ -4,8 +4,7 @@ import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
@ -21,7 +20,7 @@ import org.junit.jupiter.api.Test;
* @author tjq
* @since 2020/3/25
*/
public class TaskTrackerTest {
public class CommonTaskTrackerTest {
private static ActorSelection remoteTaskTracker;
@ -48,48 +47,14 @@ public class TaskTrackerTest {
@Test
public void testStandaloneJob() throws Exception {
remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.STANDALONE), null);
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.CRON), null);
Thread.sleep(5000000);
}
@Test
public void testMapReduceJob() throws Exception {
remoteTaskTracker.tell(genServerScheduleJobReq(ExecuteType.MAP_REDUCE), null);
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.MAP_REDUCE, TimeExpressionType.CRON), null);
Thread.sleep(5000000);
}
private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) {
ServerScheduleJobReq req = new ServerScheduleJobReq();
req.setJobId(1L);
req.setInstanceId(10086L);
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
req.setJobParams("this is job Params");
req.setInstanceParams("this is instance Params");
req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
req.setTaskRetryNum(3);
req.setThreadConcurrency(20);
req.setInstanceTimeoutMS(500000);
req.setTaskTimeoutMS(500000);
switch (executeType) {
case STANDALONE:
req.setExecuteType(ExecuteType.STANDALONE.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
break;
case MAP_REDUCE:
req.setExecuteType(ExecuteType.MAP_REDUCE.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
break;
case BROADCAST:
req.setExecuteType(ExecuteType.BROADCAST.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor");
break;
}
return req;
}
}

View File

@ -0,0 +1,53 @@
package com.github.kfcfans.oms;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* description
*
* @author tjq
* @since 2020/4/9
*/
public class FrequentTaskTrackerTest {
private static ActorSelection remoteTaskTracker;
@BeforeAll
public static void init() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
ohMyConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME);
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
}
@Test
public void testFixRateJob() throws Exception {
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.FIX_RATE), null);
Thread.sleep(5000000);
}
@Test
public void testFixDelayJob() throws Exception {
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.MAP_REDUCE, TimeExpressionType.FIX_DELAY), null);
Thread.sleep(5000000);
}
}

View File

@ -0,0 +1,58 @@
package com.github.kfcfans.oms;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.common.utils.NetUtils;
import com.google.common.collect.Lists;
/**
* 测试需要用到的工具类
*
* @author tjq
* @since 2020/4/9
*/
public class TestUtils {
public static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType, TimeExpressionType timeExpressionType) {
ServerScheduleJobReq req = new ServerScheduleJobReq();
req.setJobId(1L);
req.setInstanceId(10086L);
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
req.setJobParams("JobParams");
req.setInstanceParams("InstanceParams");
req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
req.setTaskRetryNum(3);
req.setThreadConcurrency(10);
req.setInstanceTimeoutMS(500000);
req.setTaskTimeoutMS(500000);
req.setTimeExpressionType(timeExpressionType.name());
switch (timeExpressionType) {
case CRON:req.setTimeExpression("0 * * * * ? ");
case FIX_RATE:
case FIX_DELAY:req.setTimeExpression("5000");
}
switch (executeType) {
case STANDALONE:
req.setExecuteType(ExecuteType.STANDALONE.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
break;
case MAP_REDUCE:
req.setExecuteType(ExecuteType.MAP_REDUCE.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
break;
case BROADCAST:
req.setExecuteType(ExecuteType.BROADCAST.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor");
break;
}
return req;
}
}

View File

@ -1,8 +1,16 @@
package com.github.kfcfans.oms.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BasicProcessor;
import com.google.common.collect.Lists;
/**
* 测试用的基础处理器
@ -14,8 +22,10 @@ public class TestBasicProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("==== ProcessResult#process");
System.out.println("TaskContext: " + context.toString());
return new ProcessResult(true, "success");
System.out.println("======== BasicProcessor#process ========");
System.out.println("TaskContext: " + JSONObject.toJSONString(context) + ";time = " + System.currentTimeMillis());
return new ProcessResult(true, System.currentTimeMillis() + "success");
}
}