[opt] code review TaskTracker and ProcessorTracker then optimized some indian code

This commit is contained in:
tjq 2020-05-17 10:45:16 +08:00
parent fe2b9c78ac
commit 67729123d2
5 changed files with 45 additions and 27 deletions

View File

@ -19,17 +19,17 @@ public class CommonUtils {
/** /**
* 重试执行仅适用于失败抛出异常的方法 * 重试执行仅适用于失败抛出异常的方法
* @param executor 需要执行的方法 * @param executor 需要执行的方法
* @param retryTimes 重试的次数 * @param tryTimes 尝试次数总执行次数
* @param intervalMS 失败后下一次执行的间隔时间 * @param intervalMS 失败后下一次执行的间隔时间
* @param <T> 执行函数返回值类型 * @param <T> 执行函数返回值类型
* @return 函数成功执行后的返回值 * @return 函数成功执行后的返回值
* @throws Exception 执行失败调用方自行处理 * @throws Exception 执行失败调用方自行处理
*/ */
public static <T> T executeWithRetry(SupplierPlus<T> executor, int retryTimes, long intervalMS) throws Exception { public static <T> T executeWithRetry(SupplierPlus<T> executor, int tryTimes, long intervalMS) throws Exception {
if (retryTimes <= 1 || intervalMS <= 0) { if (tryTimes <= 1 || intervalMS <= 0) {
return executor.get(); return executor.get();
} }
for (int i = 1; i < retryTimes; i++) { for (int i = 1; i < tryTimes; i++) {
try { try {
return executor.get(); return executor.get();
}catch (Exception e) { }catch (Exception e) {
@ -46,17 +46,17 @@ public class CommonUtils {
/** /**
* 重试执行仅适用于根据返回值决定是否执行成功的方法 * 重试执行仅适用于根据返回值决定是否执行成功的方法
* @param booleanExecutor 需要执行的方法其返回值决定了执行是否成功 * @param booleanExecutor 需要执行的方法其返回值决定了执行是否成功
* @param retryTimes 重试次数 * @param tryTimes 尝试执行次数
* @param intervalMS 失败后下一次执行的间隔时间 * @param intervalMS 失败后下一次执行的间隔时间
* @return 最终执行结果 * @return 最终执行结果
*/ */
public static boolean executeWithRetryV2(Supplier<Boolean> booleanExecutor, int retryTimes, long intervalMS) { public static boolean executeWithRetryV2(Supplier<Boolean> booleanExecutor, int tryTimes, long intervalMS) {
if (retryTimes <= 1 || intervalMS <= 0) { if (tryTimes <= 1 || intervalMS <= 0) {
return booleanExecutor.get(); return booleanExecutor.get();
} }
for (int i = 0; i < retryTimes; i++) { for (int i = 1; i < tryTimes; i++) {
try { try {
if (booleanExecutor.get()) { if (booleanExecutor.get()) {
return true; return true;

View File

@ -54,7 +54,7 @@ public class CommonTaskTracker extends TaskTracker {
persistenceRootTask(); persistenceRootTask();
// 启动定时任务任务派发 & 状态检查 // 启动定时任务任务派发 & 状态检查
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 0, 1, TimeUnit.SECONDS); scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 0, 5, TimeUnit.SECONDS);
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS);
} }
@ -116,7 +116,7 @@ public class CommonTaskTracker extends TaskTracker {
*/ */
private class StatusCheckRunnable implements Runnable { private class StatusCheckRunnable implements Runnable {
private static final long TIME_OUT_MS = 5000; private static final long DISPATCH_TIME_OUT_MS = 15000;
private void innerRun() { private void innerRun() {
@ -218,11 +218,11 @@ public class CommonTaskTracker extends TaskTracker {
req.setResult(result); req.setResult(result);
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS)); CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
boolean serverAccepted = false; boolean serverAccepted = false;
try { try {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(TIME_OUT_MS, TimeUnit.MILLISECONDS); AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverAccepted = askResponse.isSuccess(); serverAccepted = askResponse.isSuccess();
}catch (Exception e) { }catch (Exception e) {
log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result); log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result);
@ -250,7 +250,7 @@ public class CommonTaskTracker extends TaskTracker {
taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> { taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
long elapsedTime = currentMS - uncheckTask.getLastModifiedTime(); long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
if (elapsedTime > TIME_OUT_MS) { if (elapsedTime > DISPATCH_TIME_OUT_MS) {
TaskDO updateEntity = new TaskDO(); TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());

View File

@ -50,7 +50,7 @@ public class FrequentTaskTracker extends TaskTracker {
// 最大同时运行实例数 // 最大同时运行实例数
private int maxInstanceNum; private int maxInstanceNum;
// 总运行次数正常情况不会出现锁竞争直接用 Atomic 系列锁竞争验证推荐 LongAdder // 总运行次数正常情况不会出现锁竞争直接用 Atomic 系列锁竞争严重推荐 LongAdder
private AtomicLong triggerTimes; private AtomicLong triggerTimes;
private AtomicLong succeedTimes; private AtomicLong succeedTimes;
private AtomicLong failedTimes; private AtomicLong failedTimes;
@ -64,6 +64,7 @@ public class FrequentTaskTracker extends TaskTracker {
private static final int HISTORY_SIZE = 10; private static final int HISTORY_SIZE = 10;
private static final String LAST_TASK_ID_PREFIX = "L"; private static final String LAST_TASK_ID_PREFIX = "L";
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
protected FrequentTaskTracker(ServerScheduleJobReq req) { protected FrequentTaskTracker(ServerScheduleJobReq req) {
super(req); super(req);
@ -120,9 +121,9 @@ public class FrequentTaskTracker extends TaskTracker {
subDetail.setSubInstanceId(subId); subDetail.setSubInstanceId(subId);
// 设置时间 // 设置时间
subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), "yyyy-MM-dd HH:mm:ss")); subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), TIME_PATTERN));
if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) { if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) {
subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), "yyyy-MM-dd HH:mm:ss")); subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), TIME_PATTERN));
}else { }else {
subDetail.setFinishedTime("N/A"); subDetail.setFinishedTime("N/A");
} }
@ -141,6 +142,10 @@ public class FrequentTaskTracker extends TaskTracker {
public void innerRun() { public void innerRun() {
if (finished.get()) {
return;
}
// 子任务实例ID // 子任务实例ID
Long subInstanceId = triggerTimes.incrementAndGet(); Long subInstanceId = triggerTimes.incrementAndGet();

View File

@ -35,6 +35,7 @@ import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
@ -63,7 +64,7 @@ public abstract class TaskTracker {
// 上报时间缓存 // 上报时间缓存
private Cache<String, Long> taskId2LastReportTime; private Cache<String, Long> taskId2LastReportTime;
// 分段锁 // 分段锁
private ReentrantLock[] locks = new ReentrantLock[UPDATE_CONCURRENCY]; private Lock[] locks = new ReentrantLock[UPDATE_CONCURRENCY];
private static final int UPDATE_CONCURRENCY = 8; private static final int UPDATE_CONCURRENCY = 8;
private static final int UPDATE_LOCK_MASK = UPDATE_CONCURRENCY - 1; private static final int UPDATE_LOCK_MASK = UPDATE_CONCURRENCY - 1;
@ -98,7 +99,7 @@ public abstract class TaskTracker {
// 子类自定义初始化操作 // 子类自定义初始化操作
initTaskTracker(req); initTaskTracker(req);
log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", req.getInstanceId(), req); log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", instanceId, req);
} }
/** /**
@ -126,13 +127,17 @@ public abstract class TaskTracker {
*/ */
public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) { public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) {
ReentrantLock lock = locks[taskId.hashCode() & UPDATE_LOCK_MASK]; if (finished.get()) {
return;
}
Lock lock = locks[taskId.hashCode() & UPDATE_LOCK_MASK];
TaskStatus nTaskStatus = TaskStatus.of(newStatus); TaskStatus nTaskStatus = TaskStatus.of(newStatus);
try { try {
// 阻塞获取锁 // 阻塞获取锁
lock.lock(); lock.lockInterruptibly();
Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId); Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId);
@ -207,7 +212,8 @@ public abstract class TaskTracker {
log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId);
} }
}finally { } catch (InterruptedException ignore) {
} finally {
lock.unlock(); lock.unlock();
} }
} }
@ -217,6 +223,9 @@ public abstract class TaskTracker {
* @param newTaskList 新增的子任务列表 * @param newTaskList 新增的子任务列表
*/ */
public boolean submitTask(List<TaskDO> newTaskList) { public boolean submitTask(List<TaskDO> newTaskList) {
if (finished.get()) {
return true;
}
if (CollectionUtils.isEmpty(newTaskList)) { if (CollectionUtils.isEmpty(newTaskList)) {
return true; return true;
} }
@ -253,6 +262,10 @@ public abstract class TaskTracker {
*/ */
public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, long reportTime, String result) { public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, long reportTime, String result) {
if (finished.get()) {
return;
}
log.info("[TaskTracker-{}] finished broadcast's preProcess.", instanceId); log.info("[TaskTracker-{}] finished broadcast's preProcess.", instanceId);
// 1. 生成集群子任务 // 1. 生成集群子任务
@ -281,6 +294,8 @@ public abstract class TaskTracker {
*/ */
public void destroy() { public void destroy() {
finished.set(true);
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
// 0. 开始关闭线程池不能使用 shutdownNow()因为 destroy 方法本身就在 scheduledPool 的线程中执行强行关闭会打断 destroy 的执行 // 0. 开始关闭线程池不能使用 shutdownNow()因为 destroy 方法本身就在 scheduledPool 的线程中执行强行关闭会打断 destroy 的执行
scheduledPool.shutdown(); scheduledPool.shutdown();
@ -299,8 +314,7 @@ public abstract class TaskTracker {
// 2. 删除所有数据库数据 // 2. 删除所有数据库数据
boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId); boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId);
if (!dbSuccess) { if (!dbSuccess) {
log.error("[TaskTracker-{}] delete tasks from database failed, shutdown TaskTracker failed.", instanceId); log.error("[TaskTracker-{}] delete tasks from database failed.", instanceId);
return;
}else { }else {
log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId); log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId);
} }
@ -422,12 +436,11 @@ public abstract class TaskTracker {
// 数量不足 查询失败则终止循环 // 数量不足 查询失败则终止循环
if (needDispatchTasks.size() < dbQueryLimit) { if (needDispatchTasks.size() < dbQueryLimit) {
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch); break;
return;
} }
} }
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch); log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
} }
} }

View File

@ -203,7 +203,7 @@ public class TaskPersistenceService {
} }
/** /**
* 查询 taskId -> taskResultreduce阶段或postProcess 阶段使用 * 查询所有Task执行结果reduce阶段 postProcess阶段 使用
*/ */
public List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) { public List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) {
try { try {