mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
Merge branch 'dev' into 4.3.7_v2
This commit is contained in:
commit
ab7a398f61
@ -69,10 +69,10 @@ public class OmsLogHandler {
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
private class LogSubmitter implements Runnable {
|
private class LogSubmitter extends RunnableAndCatch {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run0() {
|
||||||
|
|
||||||
boolean lockResult = reportLock.tryLock();
|
boolean lockResult = reportLock.tryLock();
|
||||||
if (!lockResult) {
|
if (!lockResult) {
|
||||||
|
@ -0,0 +1,25 @@
|
|||||||
|
package tech.powerjob.worker.background;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 使用 {@link ScheduledExecutorService} 执行任务时,推荐继承此类捕获并打印异常,避免因为抛出异常导致周期性任务终止
|
||||||
|
*
|
||||||
|
* @author songyinyin
|
||||||
|
* @since 2023/9/20 15:52
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public abstract class RunnableAndCatch implements Runnable{
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
run0();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[RunnableAndCatch] run failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void run0();
|
||||||
|
}
|
@ -0,0 +1,30 @@
|
|||||||
|
package tech.powerjob.worker.background;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 使用 {@link ScheduledExecutorService} 执行任务时,推荐使用此对象包装一层,避免因为抛出异常导致周期性任务终止
|
||||||
|
*
|
||||||
|
* @author songyinyin
|
||||||
|
* @since 2023/9/20 16:04
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class RunnableWrapper implements Runnable {
|
||||||
|
|
||||||
|
private final Runnable runnable;
|
||||||
|
|
||||||
|
public RunnableWrapper(Runnable runnable) {
|
||||||
|
this.runnable = runnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
runnable.run();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[RunnableWrapper] run failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -22,12 +22,12 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class WorkerHealthReporter implements Runnable {
|
public class WorkerHealthReporter extends RunnableAndCatch {
|
||||||
|
|
||||||
private final WorkerRuntime workerRuntime;
|
private final WorkerRuntime workerRuntime;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run0() {
|
||||||
|
|
||||||
// 没有可用Server,无法上报
|
// 没有可用Server,无法上报
|
||||||
String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
|
String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
|
||||||
|
@ -9,6 +9,7 @@ import tech.powerjob.common.enums.ProcessorType;
|
|||||||
import tech.powerjob.common.enums.TimeExpressionType;
|
import tech.powerjob.common.enums.TimeExpressionType;
|
||||||
import tech.powerjob.common.utils.CollectionUtils;
|
import tech.powerjob.common.utils.CollectionUtils;
|
||||||
import tech.powerjob.common.utils.CommonUtils;
|
import tech.powerjob.common.utils.CommonUtils;
|
||||||
|
import tech.powerjob.worker.background.RunnableAndCatch;
|
||||||
import tech.powerjob.worker.common.WorkerRuntime;
|
import tech.powerjob.worker.common.WorkerRuntime;
|
||||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||||
import tech.powerjob.worker.common.utils.TransportUtils;
|
import tech.powerjob.worker.common.utils.TransportUtils;
|
||||||
@ -237,11 +238,11 @@ public class ProcessorTracker {
|
|||||||
/**
|
/**
|
||||||
* 定时向 TaskTracker 汇报(携带任务执行信息的心跳)
|
* 定时向 TaskTracker 汇报(携带任务执行信息的心跳)
|
||||||
*/
|
*/
|
||||||
private class CheckerAndReporter implements Runnable {
|
private class CheckerAndReporter extends RunnableAndCatch {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings({"squid:S1066","squid:S3776"})
|
@SuppressWarnings({"squid:S1066","squid:S3776"})
|
||||||
public void run() {
|
public void run0() {
|
||||||
|
|
||||||
// 超时检查,如果超时则自动关闭 TaskTracker
|
// 超时检查,如果超时则自动关闭 TaskTracker
|
||||||
long interval = System.currentTimeMillis() - startTime;
|
long interval = System.currentTimeMillis() - startTime;
|
||||||
|
@ -19,6 +19,7 @@ import tech.powerjob.common.serialize.JsonUtils;
|
|||||||
import tech.powerjob.common.utils.CollectionUtils;
|
import tech.powerjob.common.utils.CollectionUtils;
|
||||||
import tech.powerjob.common.utils.CommonUtils;
|
import tech.powerjob.common.utils.CommonUtils;
|
||||||
import tech.powerjob.common.utils.SegmentLock;
|
import tech.powerjob.common.utils.SegmentLock;
|
||||||
|
import tech.powerjob.worker.background.RunnableAndCatch;
|
||||||
import tech.powerjob.worker.common.WorkerRuntime;
|
import tech.powerjob.worker.common.WorkerRuntime;
|
||||||
import tech.powerjob.worker.common.constants.TaskConstant;
|
import tech.powerjob.worker.common.constants.TaskConstant;
|
||||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||||
@ -444,13 +445,13 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
|||||||
/**
|
/**
|
||||||
* 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去
|
* 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去
|
||||||
*/
|
*/
|
||||||
protected class Dispatcher implements Runnable {
|
protected class Dispatcher extends RunnableAndCatch {
|
||||||
|
|
||||||
// 数据库查询限制,每次最多查询几个任务
|
// 数据库查询限制,每次最多查询几个任务
|
||||||
private static final int DB_QUERY_LIMIT = 100;
|
private static final int DB_QUERY_LIMIT = 100;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run0() {
|
||||||
|
|
||||||
if (finished.get()) {
|
if (finished.get()) {
|
||||||
return;
|
return;
|
||||||
@ -502,9 +503,9 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
|||||||
* 执行器动态上线(for 秒级任务和 MR 任务)
|
* 执行器动态上线(for 秒级任务和 MR 任务)
|
||||||
* 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改
|
* 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改
|
||||||
*/
|
*/
|
||||||
protected class WorkerDetector implements Runnable {
|
protected class WorkerDetector extends RunnableAndCatch {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run0() {
|
||||||
|
|
||||||
boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker();
|
boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker();
|
||||||
log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker);
|
log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker);
|
||||||
|
@ -9,6 +9,7 @@ import tech.powerjob.common.enums.InstanceStatus;
|
|||||||
import tech.powerjob.common.model.InstanceDetail;
|
import tech.powerjob.common.model.InstanceDetail;
|
||||||
import tech.powerjob.common.request.ServerScheduleJobReq;
|
import tech.powerjob.common.request.ServerScheduleJobReq;
|
||||||
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
|
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
|
||||||
|
import tech.powerjob.worker.background.RunnableWrapper;
|
||||||
import tech.powerjob.worker.common.WorkerRuntime;
|
import tech.powerjob.worker.common.WorkerRuntime;
|
||||||
import tech.powerjob.worker.common.constants.TaskConstant;
|
import tech.powerjob.worker.common.constants.TaskConstant;
|
||||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||||
@ -93,14 +94,14 @@ public class LightTaskTracker extends TaskTracker {
|
|||||||
// 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段
|
// 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段
|
||||||
long initDelay = RandomUtils.nextInt(5000, 10000);
|
long initDelay = RandomUtils.nextInt(5000, 10000);
|
||||||
// 上报任务状态
|
// 上报任务状态
|
||||||
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
|
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new RunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS);
|
||||||
// 超时控制
|
// 超时控制
|
||||||
if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
|
if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
|
||||||
if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
|
if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
|
||||||
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
|
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
|
||||||
} else {
|
} else {
|
||||||
// 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s
|
// 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s
|
||||||
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
|
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
timeoutCheckScheduledFuture = null;
|
timeoutCheckScheduledFuture = null;
|
||||||
@ -148,7 +149,12 @@ public class LightTaskTracker extends TaskTracker {
|
|||||||
}
|
}
|
||||||
LightTaskTrackerManager.removeTaskTracker(instanceId);
|
LightTaskTrackerManager.removeTaskTracker(instanceId);
|
||||||
// 最后一列为总耗时(即占用资源的耗时,当前时间减去创建时间)
|
// 最后一列为总耗时(即占用资源的耗时,当前时间减去创建时间)
|
||||||
log.warn("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime);
|
String msg = String.format("[TaskTracker-%s] remove TaskTracker,task status %s,start time:%s,end time:%s,real cost:%s,total time:%s", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime);
|
||||||
|
if (TaskStatus.WORKER_PROCESS_SUCCESS.equals(status)) {
|
||||||
|
log.info(msg);
|
||||||
|
} else {
|
||||||
|
log.warn(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user