From 3528a7724fe086b807fd13a4ff5661adc8b726db Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 8 Jul 2020 15:09:20 +0800 Subject: [PATCH] [fix] fix the problem of log sync threadpool oversize --- .../common/config/ThreadPoolConfig.java | 24 ++++++++++++++++--- .../server/service/InstanceLogService.java | 8 +------ .../service/instance/InstanceManager.java | 2 +- .../schedule/HashedWheelTimerHolder.java | 4 ++++ 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index 3dc4fd71..fb784a0e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -8,8 +8,7 @@ import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.*; /** * 公用线程池配置 @@ -51,7 +50,19 @@ public class ThreadPoolConfig { executor.setQueueCapacity(1024); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("omsCommonPool-"); - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + executor.setRejectedExecutionHandler(new LogOnRejected()); + return executor; + } + + @Bean("omsBackgroundPool") + public Executor initBackgroundPool() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); + executor.setQueueCapacity(8192); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("omsBackgroundPool-"); + executor.setRejectedExecutionHandler(new LogOnRejected()); return executor; } @@ -65,4 +76,11 @@ public class ThreadPoolConfig { return scheduler; } + private static final class LogOnRejected implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor p) { + log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p); + } + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index 025a5720..6a71ad1f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -170,15 +170,9 @@ public class InstanceLogService { * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ - @Async("omsCommonPool") + @Async("omsBackgroundPool") public void sync(Long instanceId) { - // 休眠10秒等待全部数据上报(OmsLogHandler 每隔5秒上报数据) - try { - TimeUnit.SECONDS.sleep(15); - }catch (Exception ignore) { - } - Stopwatch sw = Stopwatch.createStarted(); try { // 先持久化到本地文件 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index a49e084c..d6938678 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -143,7 +143,7 @@ public class InstanceManager { log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name()); // 上报日志数据 - instanceLogService.sync(instanceId); + HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 15, TimeUnit.SECONDS); // workflow 特殊处理 if (wfInstanceId != null) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java index 193add88..a6667d51 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java @@ -10,8 +10,12 @@ import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTime */ public class HashedWheelTimerHolder { + // 精确时间轮,每 1S 走一格 public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); + // 非精确时间轮,每 5S 走一格 + public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0); + private HashedWheelTimerHolder() { } }