diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/PJThreadPool.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/PJThreadPool.java new file mode 100644 index 00000000..ef8b2b06 --- /dev/null +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/PJThreadPool.java @@ -0,0 +1,26 @@ +package tech.powerjob.server.common.constants; + +/** + * 线程池 + * + * @author tjq + * @since 2022/9/12 + */ +public class PJThreadPool { + + /** + * 定时调度用线程池 + */ + public static final String TIMING_POOL = "PowerJobTimingPool"; + + /** + * 后台任务异步线程池 + */ + public static final String BACKGROUND_POOL = "PowerJobBackgroundPool"; + + /** + * 本地数据库专用线程池 + */ + public static final String LOCAL_DB_POOL = "PowerJobLocalDbPool"; + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java index 4ffc21d0..dea809b4 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java @@ -7,6 +7,7 @@ import tech.powerjob.common.model.InstanceLogContent; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.SegmentLock; +import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.persistence.StringPage; @@ -51,9 +52,6 @@ public class InstanceLogService { @Value("${server.port}") private int port; - @Resource - private Executor omsLocalDbPool; - @Resource private InstanceMetadataService instanceMetadataService; @Resource @@ -100,24 +98,23 @@ public class InstanceLogService { * @param workerAddress 上报机器地址 * @param logs 任务实例运行时日志 */ + @Async(value = PJThreadPool.LOCAL_DB_POOL) public void submitLogs(String workerAddress, List logs) { - omsLocalDbPool.execute(() -> { - List logList = logs.stream().map(x -> { - instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis()); + List logList = logs.stream().map(x -> { + instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis()); - LocalInstanceLogDO y = new LocalInstanceLogDO(); - BeanUtils.copyProperties(x, y); - y.setWorkerAddress(workerAddress); - return y; - }).collect(Collectors.toList()); + LocalInstanceLogDO y = new LocalInstanceLogDO(); + BeanUtils.copyProperties(x, y); + y.setWorkerAddress(workerAddress); + return y; + }).collect(Collectors.toList()); - try { - CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList)); - }catch (Exception e) { - log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e); - } - }); + try { + CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList)); + }catch (Exception e) { + log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e); + } } /** @@ -208,7 +205,7 @@ public class InstanceLogService { * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ - @Async("omsBackgroundPool") + @Async(PJThreadPool.BACKGROUND_POOL) public void sync(Long instanceId) { Stopwatch sw = Stopwatch.createStarted(); @@ -350,7 +347,7 @@ public class InstanceLogService { } - @Async("omsTimingPool") + @Async(PJThreadPool.TIMING_POOL) @Scheduled(fixedDelay = 120000) public void timingCheck() { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java index 03ba4ffb..d9c9e137 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java @@ -2,6 +2,7 @@ package tech.powerjob.server.core.scheduler; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus; +import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; @@ -62,7 +63,7 @@ public class CleanService { private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; - @Async("omsTimingPool") + @Async(PJThreadPool.TIMING_POOL) @Scheduled(cron = CLEAN_TIME_EXPRESSION) public void timingClean() { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java index 4ca3e327..b324923e 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java @@ -4,6 +4,7 @@ import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.WorkflowInstanceStatus; +import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.remote.transport.starter.AkkaStarter; import tech.powerjob.server.persistence.remote.model.*; @@ -59,7 +60,7 @@ public class InstanceStatusCheckService { @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - @Async("omsTimingPool") + @Async(PJThreadPool.TIMING_POOL) @Scheduled(fixedDelay = 10000) public void timingStatusCheck() { Stopwatch stopwatch = Stopwatch.createStarted(); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index 7e46c452..862acf8e 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -3,6 +3,7 @@ package tech.powerjob.server.core.scheduler; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.LifeCycle; +import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.remote.transport.starter.AkkaStarter; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.persistence.remote.model.AppInfoDO; @@ -73,7 +74,7 @@ public class PowerScheduleService { private static final long SCHEDULE_RATE = 15000; - @Async("omsTimingPool") + @Async(PJThreadPool.TIMING_POOL) @Scheduled(fixedDelay = SCHEDULE_RATE) public void timingSchedule() { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java index 75949b90..f3ce629e 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java @@ -1,5 +1,6 @@ package tech.powerjob.server.config; +import org.springframework.core.task.TaskExecutor; import tech.powerjob.server.common.RejectedExecutionHandlerFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; @@ -8,14 +9,12 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import tech.powerjob.server.common.constants.PJThreadPool; import java.util.concurrent.*; /** * 公用线程池配置 - * omsTimingPool:用于执行定时任务的线程池 - * omsBackgroundPool:用于执行后台任务的线程池,这类任务对时间不敏感,慢慢执行细水长流即可 - * taskScheduler:用于定时调度的线程池 * * @author tjq * @since 2020/4/28 @@ -25,7 +24,7 @@ import java.util.concurrent.*; @Configuration public class ThreadPoolConfig { - @Bean("omsTimingPool") + @Bean(PJThreadPool.TIMING_POOL) public Executor getTimingPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); @@ -33,31 +32,31 @@ public class ThreadPoolConfig { // use SynchronousQueue executor.setQueueCapacity(0); executor.setKeepAliveSeconds(60); - executor.setThreadNamePrefix("omsTimingPool-"); - executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PJ-TIMING")); + executor.setThreadNamePrefix("PJ-TIMING-"); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun(PJThreadPool.TIMING_POOL)); return executor; } - @Bean("omsBackgroundPool") + @Bean(PJThreadPool.BACKGROUND_POOL) public Executor initBackgroundPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16); executor.setQueueCapacity(8192); executor.setKeepAliveSeconds(60); - executor.setThreadNamePrefix("omsBackgroundPool-"); - executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard("PJ-BACKGROUND")); + executor.setThreadNamePrefix("PJ-BG-"); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard(PJThreadPool.BACKGROUND_POOL)); return executor; } - @Bean("omsLocalDbPool") + @Bean(PJThreadPool.LOCAL_DB_POOL) public Executor initOmsLocalDbPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() / 2); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() / 2); executor.setQueueCapacity(2048); - executor.setThreadNamePrefix("omsLocalDbPool-"); - executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort("PJ-LOCAL-DB")); + executor.setThreadNamePrefix("PJ-LOCALDB-"); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort(PJThreadPool.LOCAL_DB_POOL)); return executor; }