mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: server async process log report to prevent timeout #432
This commit is contained in:
parent
3869b115ce
commit
3b73a750e6
@ -2,6 +2,7 @@ package tech.powerjob.server.common;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@ -16,6 +17,19 @@ public class RejectedExecutionHandlerFactory {
|
||||
|
||||
private static final AtomicLong COUNTER = new AtomicLong();
|
||||
|
||||
/**
|
||||
* 拒绝执行,抛出 RejectedExecutionException
|
||||
* @param source name for log
|
||||
* @return A handler for tasks that cannot be executed by ThreadPool
|
||||
*/
|
||||
public static RejectedExecutionHandler newAbort(String source) {
|
||||
return (r, e) -> {
|
||||
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r);
|
||||
throw new RejectedExecutionException("Task " + r.toString() +
|
||||
" rejected from " + source);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 直接丢弃该任务
|
||||
* @param source log name
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.common.timewheel.holder;
|
||||
|
||||
import tech.powerjob.server.common.timewheel.HashedWheelTimer;
|
||||
import tech.powerjob.server.common.timewheel.Timer;
|
||||
|
||||
/**
|
||||
* 时间轮单例
|
||||
@ -11,7 +12,7 @@ import tech.powerjob.server.common.timewheel.HashedWheelTimer;
|
||||
public class HashedWheelTimerHolder {
|
||||
|
||||
// 非精确时间轮,每 5S 走一格
|
||||
public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0);
|
||||
public static final Timer INACCURATE_TIMER = new HashedWheelTimer(5000, 16, 0);
|
||||
|
||||
private HashedWheelTimerHolder() {
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.common.timewheel.holder;
|
||||
|
||||
import tech.powerjob.server.common.timewheel.HashedWheelTimer;
|
||||
import tech.powerjob.server.common.timewheel.Timer;
|
||||
import tech.powerjob.server.common.timewheel.TimerFuture;
|
||||
import tech.powerjob.server.common.timewheel.TimerTask;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -19,9 +20,9 @@ public class InstanceTimeWheelService {
|
||||
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
|
||||
|
||||
// 精确调度时间轮,每 1MS 走一格
|
||||
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
|
||||
private static final Timer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
|
||||
// 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格
|
||||
private static final HashedWheelTimer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
|
||||
private static final Timer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
|
||||
|
||||
// 支持取消的时间间隔,低于该阈值则不会放进 CARGO
|
||||
private static final long MIN_INTERVAL_MS = 1000;
|
||||
|
@ -94,7 +94,6 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
|
||||
@Override
|
||||
public void processWorkerLogReport(WorkerLogReportReq req) {
|
||||
|
||||
long startMs = System.currentTimeMillis();
|
||||
WorkerLogReportEvent event = new WorkerLogReportEvent()
|
||||
.setWorkerAddress(req.getWorkerAddress());
|
||||
try {
|
||||
@ -106,7 +105,6 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
|
||||
event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
|
||||
log.warn("[WorkerRequestHandler] process worker report failed!", t);
|
||||
} finally {
|
||||
event.setServerCost(System.currentTimeMillis() - startMs);
|
||||
monitorService.monitor(event);
|
||||
}
|
||||
}
|
||||
|
@ -51,6 +51,9 @@ public class InstanceLogService {
|
||||
@Value("${server.port}")
|
||||
private int port;
|
||||
|
||||
@Resource
|
||||
private Executor omsLocalDbPool;
|
||||
|
||||
@Resource
|
||||
private InstanceMetadataService instanceMetadataService;
|
||||
@Resource
|
||||
@ -99,20 +102,22 @@ public class InstanceLogService {
|
||||
*/
|
||||
public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {
|
||||
|
||||
List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
|
||||
instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());
|
||||
omsLocalDbPool.execute(() -> {
|
||||
List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
|
||||
instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());
|
||||
|
||||
LocalInstanceLogDO y = new LocalInstanceLogDO();
|
||||
BeanUtils.copyProperties(x, y);
|
||||
y.setWorkerAddress(workerAddress);
|
||||
return y;
|
||||
}).collect(Collectors.toList());
|
||||
LocalInstanceLogDO y = new LocalInstanceLogDO();
|
||||
BeanUtils.copyProperties(x, y);
|
||||
y.setWorkerAddress(workerAddress);
|
||||
return y;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
try {
|
||||
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
|
||||
}catch (Exception e) {
|
||||
log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
|
||||
}
|
||||
try {
|
||||
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
|
||||
}catch (Exception e) {
|
||||
log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -191,7 +191,7 @@ public class InstanceManager {
|
||||
log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());
|
||||
|
||||
// 上报日志数据
|
||||
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 15, TimeUnit.SECONDS);
|
||||
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 60, TimeUnit.SECONDS);
|
||||
|
||||
// workflow 特殊处理
|
||||
if (wfInstanceId != null) {
|
||||
|
@ -22,8 +22,6 @@ public class WorkerLogReportEvent implements Event {
|
||||
|
||||
private Status status;
|
||||
|
||||
private long serverCost;
|
||||
|
||||
public enum Status {
|
||||
SUCCESS,
|
||||
REJECTED,
|
||||
@ -37,6 +35,6 @@ public class WorkerLogReportEvent implements Event {
|
||||
|
||||
@Override
|
||||
public String message() {
|
||||
return SJ.MONITOR_JOINER.join(workerAddress, logSize, status, serverCost);
|
||||
return SJ.MONITOR_JOINER.join(workerAddress, logSize, status);
|
||||
}
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ public class ThreadPoolConfig {
|
||||
executor.setQueueCapacity(0);
|
||||
executor.setKeepAliveSeconds(60);
|
||||
executor.setThreadNamePrefix("omsTimingPool-");
|
||||
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTiming"));
|
||||
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PJ-TIMING"));
|
||||
return executor;
|
||||
}
|
||||
|
||||
@ -46,7 +46,18 @@ public class ThreadPoolConfig {
|
||||
executor.setQueueCapacity(8192);
|
||||
executor.setKeepAliveSeconds(60);
|
||||
executor.setThreadNamePrefix("omsBackgroundPool-");
|
||||
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard("PowerJobBackgroundPool"));
|
||||
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard("PJ-BACKGROUND"));
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Bean("omsLocalDbPool")
|
||||
public Executor initOmsLocalDbPool() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() / 2);
|
||||
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() / 2);
|
||||
executor.setQueueCapacity(2048);
|
||||
executor.setThreadNamePrefix("omsLocalDbPool-");
|
||||
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort("PJ-LOCAL-DB"));
|
||||
return executor;
|
||||
}
|
||||
|
||||
@ -55,7 +66,7 @@ public class ThreadPoolConfig {
|
||||
public TaskScheduler taskScheduler() {
|
||||
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
||||
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
|
||||
scheduler.setThreadNamePrefix("PowerJobSchedulePool-");
|
||||
scheduler.setThreadNamePrefix("PJ-WS-");
|
||||
scheduler.setDaemon(true);
|
||||
return scheduler;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user