fix: 周期性任务出现异常时,导致任务停止

This commit is contained in:
songyinyin 2023-09-20 17:17:36 +08:00
parent 592dff8d75
commit 8f3803bda6
7 changed files with 71 additions and 13 deletions

View File

@ -69,10 +69,10 @@ public class OmsLogHandler {
private class LogSubmitter implements Runnable {
private class LogSubmitter extends RunnableAndCatch {
@Override
public void run() {
public void run0() {
boolean lockResult = reportLock.tryLock();
if (!lockResult) {

View File

@ -0,0 +1,25 @@
package tech.powerjob.worker.background;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledExecutorService;
/**
* 使用 {@link ScheduledExecutorService} 执行任务时推荐继承此类捕获并打印异常避免因为抛出异常导致周期性任务终止
*
* @author songyinyin
* @since 2023/9/20 15:52
*/
@Slf4j
public abstract class RunnableAndCatch implements Runnable{
@Override
public void run() {
try {
run0();
} catch (Exception e) {
log.error("[RunnableAndCatch] run failed", e);
}
}
protected abstract void run0();
}

View File

@ -0,0 +1,30 @@
package tech.powerjob.worker.background;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledExecutorService;
/**
* 使用 {@link ScheduledExecutorService} 执行任务时推荐使用此对象包装一层避免因为抛出异常导致周期性任务终止
*
* @author songyinyin
* @since 2023/9/20 16:04
*/
@Slf4j
public class RunnableWrapper implements Runnable {
private final Runnable runnable;
public RunnableWrapper(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
log.error("[RunnableWrapper] run failed", e);
}
}
}

View File

@ -22,12 +22,12 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
*/
@Slf4j
@RequiredArgsConstructor
public class WorkerHealthReporter implements Runnable {
public class WorkerHealthReporter extends RunnableAndCatch {
private final WorkerRuntime workerRuntime;
@Override
public void run() {
public void run0() {
// 没有可用Server无法上报
String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();

View File

@ -9,6 +9,7 @@ import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.background.RunnableAndCatch;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.TransportUtils;
@ -237,11 +238,11 @@ public class ProcessorTracker {
/**
* 定时向 TaskTracker 汇报携带任务执行信息的心跳
*/
private class CheckerAndReporter implements Runnable {
private class CheckerAndReporter extends RunnableAndCatch {
@Override
@SuppressWarnings({"squid:S1066","squid:S3776"})
public void run() {
public void run0() {
// 超时检查如果超时则自动关闭 TaskTracker
long interval = System.currentTimeMillis() - startTime;

View File

@ -19,6 +19,7 @@ import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.worker.background.RunnableAndCatch;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
@ -445,13 +446,13 @@ public abstract class HeavyTaskTracker extends TaskTracker {
/**
* 定时扫描数据库中的task出于内存占用量考虑每次最多获取100个并将需要执行的任务派发出去
*/
protected class Dispatcher implements Runnable {
protected class Dispatcher extends RunnableAndCatch {
// 数据库查询限制每次最多查询几个任务
private static final int DB_QUERY_LIMIT = 100;
@Override
public void run() {
public void run0() {
if (finished.get()) {
return;
@ -503,9 +504,9 @@ public abstract class HeavyTaskTracker extends TaskTracker {
* 执行器动态上线for 秒级任务和 MR 任务
* 原则server 查询得到的 执行器状态不会干预 worker 自己维护的状态即只做新增不做任何修改
*/
protected class WorkerDetector implements Runnable {
protected class WorkerDetector extends RunnableAndCatch {
@Override
public void run() {
public void run0() {
boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker();
log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker);

View File

@ -9,6 +9,7 @@ import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.worker.background.RunnableWrapper;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskConstant;
import tech.powerjob.worker.common.constants.TaskStatus;
@ -93,14 +94,14 @@ public class LightTaskTracker extends TaskTracker {
// 初始延迟加入随机值避免在高并发场景下所有请求集中在一个时间段
long initDelay = RandomUtils.nextInt(5000, 10000);
// 上报任务状态
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new RunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS);
// 超时控制
if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
} else {
// 执行时间超过 1 s 的任务超时检测最小颗粒度为 1 s
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
}
} else {
timeoutCheckScheduledFuture = null;