From 2db0f05feb20c765a3db1007a766e9bf61a84971 Mon Sep 17 00:00:00 2001 From: tjq Date: Mon, 12 Sep 2022 11:33:13 +0800 Subject: [PATCH] feat: optimize thread pool config --- .../server/core/instance/InstanceLogService.java | 13 ++++++------- .../powerjob/server/config/ThreadPoolConfig.java | 7 ++++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java index dea809b4..9f75587a 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java @@ -1,5 +1,7 @@ package tech.powerjob.server.core.instance; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; import tech.powerjob.common.enums.LogLevel; import tech.powerjob.common.OmsConstant; import tech.powerjob.common.enums.TimeExpressionType; @@ -68,7 +70,9 @@ public class InstanceLogService { * 本地维护了在线日志的任务实例ID */ private final Map instanceId2LastReportTime = Maps.newConcurrentMap(); - private final ExecutorService workerPool; + + @Resource(name = PJThreadPool.BACKGROUND_POOL) + private AsyncTaskExecutor powerJobBackgroundPool; /** * 分段锁 @@ -88,11 +92,6 @@ public class InstanceLogService { */ private static final long EXPIRE_INTERVAL_MS = 60000; - public InstanceLogService() { - int coreSize = Runtime.getRuntime().availableProcessors(); - workerPool = new ThreadPoolExecutor(coreSize, coreSize, 1, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue()); - } - /** * 提交日志记录,持久化到本地数据库中 * @param workerAddress 上报机器地址 @@ -192,7 +191,7 @@ public class InstanceLogService { * @return 异步结果 */ private Future prepareLogFile(long instanceId) { - return workerPool.submit(() -> { + return powerJobBackgroundPool.submit(() -> { // 在线日志还在不断更新,需要使用本地数据库中的数据 if (instanceId2LastReportTime.containsKey(instanceId)) { return genTemporaryLogFile(instanceId); diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java index f3ce629e..99b61ade 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java @@ -1,5 +1,6 @@ package tech.powerjob.server.config; +import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import tech.powerjob.server.common.RejectedExecutionHandlerFactory; import lombok.extern.slf4j.Slf4j; @@ -25,7 +26,7 @@ import java.util.concurrent.*; public class ThreadPoolConfig { @Bean(PJThreadPool.TIMING_POOL) - public Executor getTimingPool() { + public TaskExecutor getTimingPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); @@ -38,7 +39,7 @@ public class ThreadPoolConfig { } @Bean(PJThreadPool.BACKGROUND_POOL) - public Executor initBackgroundPool() { + public AsyncTaskExecutor initBackgroundPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16); @@ -50,7 +51,7 @@ public class ThreadPoolConfig { } @Bean(PJThreadPool.LOCAL_DB_POOL) - public Executor initOmsLocalDbPool() { + public TaskExecutor initOmsLocalDbPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() / 2); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() / 2);