diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java index 8e155065..955c074e 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/oms/common/utils/CommonUtils.java @@ -19,17 +19,17 @@ public class CommonUtils { /** * 重试执行,仅适用于失败抛出异常的方法 * @param executor 需要执行的方法 - * @param retryTimes 重试的次数 + * @param tryTimes 尝试次数(总执行次数) * @param intervalMS 失败后下一次执行的间隔时间 * @param 执行函数返回值类型 * @return 函数成功执行后的返回值 * @throws Exception 执行失败,调用方自行处理 */ - public static T executeWithRetry(SupplierPlus executor, int retryTimes, long intervalMS) throws Exception { - if (retryTimes <= 1 || intervalMS <= 0) { + public static T executeWithRetry(SupplierPlus executor, int tryTimes, long intervalMS) throws Exception { + if (tryTimes <= 1 || intervalMS <= 0) { return executor.get(); } - for (int i = 1; i < retryTimes; i++) { + for (int i = 1; i < tryTimes; i++) { try { return executor.get(); }catch (Exception e) { @@ -46,17 +46,17 @@ public class CommonUtils { /** * 重试执行,仅适用于根据返回值决定是否执行成功的方法 * @param booleanExecutor 需要执行的方法,其返回值决定了执行是否成功 - * @param retryTimes 重试次数 + * @param tryTimes 尝试执行次数 * @param intervalMS 失败后下一次执行的间隔时间 * @return 最终执行结果 */ - public static boolean executeWithRetryV2(Supplier booleanExecutor, int retryTimes, long intervalMS) { + public static boolean executeWithRetryV2(Supplier booleanExecutor, int tryTimes, long intervalMS) { - if (retryTimes <= 1 || intervalMS <= 0) { + if (tryTimes <= 1 || intervalMS <= 0) { return booleanExecutor.get(); } - for (int i = 0; i < retryTimes; i++) { + for (int i = 1; i < tryTimes; i++) { try { if (booleanExecutor.get()) { return true; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java index 62b00cce..90b6f504 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java @@ -54,7 +54,7 @@ public class CommonTaskTracker extends TaskTracker { persistenceRootTask(); // 启动定时任务(任务派发 & 状态检查) - scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 0, 1, TimeUnit.SECONDS); + scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 0, 5, TimeUnit.SECONDS); scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); } @@ -116,7 +116,7 @@ public class CommonTaskTracker extends TaskTracker { */ private class StatusCheckRunnable implements Runnable { - private static final long TIME_OUT_MS = 5000; + private static final long DISPATCH_TIME_OUT_MS = 15000; private void innerRun() { @@ -218,11 +218,11 @@ public class CommonTaskTracker extends TaskTracker { req.setResult(result); req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); - CompletionStage askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS)); + CompletionStage askCS = Patterns.ask(serverActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); boolean serverAccepted = false; try { - AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(TIME_OUT_MS, TimeUnit.MILLISECONDS); + AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); serverAccepted = askResponse.isSuccess(); }catch (Exception e) { log.warn("[TaskTracker-{}] report finished status failed, result={}.", instanceId, result); @@ -250,7 +250,7 @@ public class CommonTaskTracker extends TaskTracker { taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> { long elapsedTime = currentMS - uncheckTask.getLastModifiedTime(); - if (elapsedTime > TIME_OUT_MS) { + if (elapsedTime > DISPATCH_TIME_OUT_MS) { TaskDO updateEntity = new TaskDO(); updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index 1b71013b..9f93a217 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -50,7 +50,7 @@ public class FrequentTaskTracker extends TaskTracker { // 最大同时运行实例数 private int maxInstanceNum; - // 总运行次数(正常情况不会出现锁竞争,直接用 Atomic 系列,锁竞争验证推荐 LongAdder) + // 总运行次数(正常情况不会出现锁竞争,直接用 Atomic 系列,锁竞争严重推荐 LongAdder) private AtomicLong triggerTimes; private AtomicLong succeedTimes; private AtomicLong failedTimes; @@ -64,6 +64,7 @@ public class FrequentTaskTracker extends TaskTracker { private static final int HISTORY_SIZE = 10; private static final String LAST_TASK_ID_PREFIX = "L"; + private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; protected FrequentTaskTracker(ServerScheduleJobReq req) { super(req); @@ -120,9 +121,9 @@ public class FrequentTaskTracker extends TaskTracker { subDetail.setSubInstanceId(subId); // 设置时间 - subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), "yyyy-MM-dd HH:mm:ss")); + subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), TIME_PATTERN)); if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) { - subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), "yyyy-MM-dd HH:mm:ss")); + subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), TIME_PATTERN)); }else { subDetail.setFinishedTime("N/A"); } @@ -141,6 +142,10 @@ public class FrequentTaskTracker extends TaskTracker { public void innerRun() { + if (finished.get()) { + return; + } + // 子任务实例ID Long subInstanceId = triggerTimes.incrementAndGet(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 655d9db7..a7879bb1 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -35,6 +35,7 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** @@ -63,7 +64,7 @@ public abstract class TaskTracker { // 上报时间缓存 private Cache taskId2LastReportTime; // 分段锁 - private ReentrantLock[] locks = new ReentrantLock[UPDATE_CONCURRENCY]; + private Lock[] locks = new ReentrantLock[UPDATE_CONCURRENCY]; private static final int UPDATE_CONCURRENCY = 8; private static final int UPDATE_LOCK_MASK = UPDATE_CONCURRENCY - 1; @@ -98,7 +99,7 @@ public abstract class TaskTracker { // 子类自定义初始化操作 initTaskTracker(req); - log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", req.getInstanceId(), req); + log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", instanceId, req); } /** @@ -126,13 +127,17 @@ public abstract class TaskTracker { */ public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) { - ReentrantLock lock = locks[taskId.hashCode() & UPDATE_LOCK_MASK]; + if (finished.get()) { + return; + } + + Lock lock = locks[taskId.hashCode() & UPDATE_LOCK_MASK]; TaskStatus nTaskStatus = TaskStatus.of(newStatus); try { // 阻塞获取锁 - lock.lock(); + lock.lockInterruptibly(); Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId); @@ -207,7 +212,8 @@ public abstract class TaskTracker { log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); } - }finally { + } catch (InterruptedException ignore) { + } finally { lock.unlock(); } } @@ -217,6 +223,9 @@ public abstract class TaskTracker { * @param newTaskList 新增的子任务列表 */ public boolean submitTask(List newTaskList) { + if (finished.get()) { + return true; + } if (CollectionUtils.isEmpty(newTaskList)) { return true; } @@ -253,6 +262,10 @@ public abstract class TaskTracker { */ public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, long reportTime, String result) { + if (finished.get()) { + return; + } + log.info("[TaskTracker-{}] finished broadcast's preProcess.", instanceId); // 1. 生成集群子任务 @@ -281,6 +294,8 @@ public abstract class TaskTracker { */ public void destroy() { + finished.set(true); + Stopwatch sw = Stopwatch.createStarted(); // 0. 开始关闭线程池,不能使用 shutdownNow(),因为 destroy 方法本身就在 scheduledPool 的线程中执行,强行关闭会打断 destroy 的执行。 scheduledPool.shutdown(); @@ -299,8 +314,7 @@ public abstract class TaskTracker { // 2. 删除所有数据库数据 boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId); if (!dbSuccess) { - log.error("[TaskTracker-{}] delete tasks from database failed, shutdown TaskTracker failed.", instanceId); - return; + log.error("[TaskTracker-{}] delete tasks from database failed.", instanceId); }else { log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId); } @@ -422,12 +436,11 @@ public abstract class TaskTracker { // 数量不足 或 查询失败,则终止循环 if (needDispatchTasks.size() < dbQueryLimit) { - log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch); - return; + break; } } - log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch); + log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index c8dc8f76..2d7d3eb6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -203,7 +203,7 @@ public class TaskPersistenceService { } /** - * 查询 taskId -> taskResult,reduce阶段或postProcess 阶段使用 + * 查询所有Task执行结果,reduce阶段 或 postProcess阶段 使用 */ public List getAllTaskResult(Long instanceId, Long subInstanceId) { try {