diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java index a9ce0413..89c56afc 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java @@ -69,10 +69,10 @@ public class OmsLogHandler { - private class LogSubmitter implements Runnable { + private class LogSubmitter extends RunnableAndCatch { @Override - public void run() { + public void run0() { boolean lockResult = reportLock.tryLock(); if (!lockResult) { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java new file mode 100644 index 00000000..df45ebfe --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java @@ -0,0 +1,25 @@ +package tech.powerjob.worker.background; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * 使用 {@link ScheduledExecutorService} 执行任务时,推荐继承此类捕获并打印异常,避免因为抛出异常导致周期性任务终止 + * + * @author songyinyin + * @since 2023/9/20 15:52 + */ +@Slf4j +public abstract class RunnableAndCatch implements Runnable{ + @Override + public void run() { + try { + run0(); + } catch (Exception e) { + log.error("[RunnableAndCatch] run failed", e); + } + } + + protected abstract void run0(); +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java new file mode 100644 index 00000000..0622d382 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java @@ -0,0 +1,30 @@ +package tech.powerjob.worker.background; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * 使用 {@link ScheduledExecutorService} 执行任务时,推荐使用此对象包装一层,避免因为抛出异常导致周期性任务终止 + * + * @author songyinyin + * @since 2023/9/20 16:04 + */ +@Slf4j +public class RunnableWrapper implements Runnable { + + private final Runnable runnable; + + public RunnableWrapper(Runnable runnable) { + this.runnable = runnable; + } + + @Override + public void run() { + try { + runnable.run(); + } catch (Exception e) { + log.error("[RunnableWrapper] run failed", e); + } + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java index 645d74ed..295da2a3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java @@ -22,12 +22,12 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager; */ @Slf4j @RequiredArgsConstructor -public class WorkerHealthReporter implements Runnable { +public class WorkerHealthReporter extends RunnableAndCatch { private final WorkerRuntime workerRuntime; @Override - public void run() { + public void run0() { // 没有可用Server,无法上报 String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 7bc79f68..4accd113 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -9,6 +9,7 @@ import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.worker.background.RunnableAndCatch; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; @@ -237,11 +238,11 @@ public class ProcessorTracker { /** * 定时向 TaskTracker 汇报(携带任务执行信息的心跳) */ - private class CheckerAndReporter implements Runnable { + private class CheckerAndReporter extends RunnableAndCatch { @Override @SuppressWarnings({"squid:S1066","squid:S3776"}) - public void run() { + public void run0() { // 超时检查,如果超时则自动关闭 TaskTracker long interval = System.currentTimeMillis() - startTime; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 4f75e0ab..e8d9c7dd 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -19,6 +19,7 @@ import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.SegmentLock; +import tech.powerjob.worker.background.RunnableAndCatch; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; @@ -445,13 +446,13 @@ public abstract class HeavyTaskTracker extends TaskTracker { /** * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 */ - protected class Dispatcher implements Runnable { + protected class Dispatcher extends RunnableAndCatch { // 数据库查询限制,每次最多查询几个任务 private static final int DB_QUERY_LIMIT = 100; @Override - public void run() { + public void run0() { if (finished.get()) { return; @@ -503,9 +504,9 @@ public abstract class HeavyTaskTracker extends TaskTracker { * 执行器动态上线(for 秒级任务和 MR 任务) * 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改 */ - protected class WorkerDetector implements Runnable { + protected class WorkerDetector extends RunnableAndCatch { @Override - public void run() { + public void run0() { boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker(); log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index 791256d0..8f8a1819 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -9,6 +9,7 @@ import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import tech.powerjob.worker.background.RunnableWrapper; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; @@ -93,14 +94,14 @@ public class LightTaskTracker extends TaskTracker { // 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段 long initDelay = RandomUtils.nextInt(5000, 10000); // 上报任务状态 - statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS); + statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new RunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS); // 超时控制 if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) { if (instanceInfo.getInstanceTimeoutMS() < 1000L) { - timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); + timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); } else { // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s - timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); + timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); } } else { timeoutCheckScheduledFuture = null;