feat: optimize threadpool config

This commit is contained in:
tjq 2022-09-12 11:07:05 +08:00
parent 3b73a750e6
commit d531bf3a22
6 changed files with 59 additions and 34 deletions

View File

@ -0,0 +1,26 @@
package tech.powerjob.server.common.constants;
/**
* 线程池
*
* @author tjq
* @since 2022/9/12
*/
public class PJThreadPool {
/**
* 定时调度用线程池
*/
public static final String TIMING_POOL = "PowerJobTimingPool";
/**
* 后台任务异步线程池
*/
public static final String BACKGROUND_POOL = "PowerJobBackgroundPool";
/**
* 本地数据库专用线程池
*/
public static final String LOCAL_DB_POOL = "PowerJobLocalDbPool";
}

View File

@ -7,6 +7,7 @@ import tech.powerjob.common.model.InstanceLogContent;
import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.StringPage; import tech.powerjob.server.persistence.StringPage;
@ -51,9 +52,6 @@ public class InstanceLogService {
@Value("${server.port}") @Value("${server.port}")
private int port; private int port;
@Resource
private Executor omsLocalDbPool;
@Resource @Resource
private InstanceMetadataService instanceMetadataService; private InstanceMetadataService instanceMetadataService;
@Resource @Resource
@ -100,9 +98,9 @@ public class InstanceLogService {
* @param workerAddress 上报机器地址 * @param workerAddress 上报机器地址
* @param logs 任务实例运行时日志 * @param logs 任务实例运行时日志
*/ */
@Async(value = PJThreadPool.LOCAL_DB_POOL)
public void submitLogs(String workerAddress, List<InstanceLogContent> logs) { public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {
omsLocalDbPool.execute(() -> {
List<LocalInstanceLogDO> logList = logs.stream().map(x -> { List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis()); instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());
@ -117,7 +115,6 @@ public class InstanceLogService {
}catch (Exception e) { }catch (Exception e) {
log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e); log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
} }
});
} }
/** /**
@ -208,7 +205,7 @@ public class InstanceLogService {
* 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行 * 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
*/ */
@Async("omsBackgroundPool") @Async(PJThreadPool.BACKGROUND_POOL)
public void sync(Long instanceId) { public void sync(Long instanceId) {
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
@ -350,7 +347,7 @@ public class InstanceLogService {
} }
@Async("omsTimingPool") @Async(PJThreadPool.TIMING_POOL)
@Scheduled(fixedDelay = 120000) @Scheduled(fixedDelay = 120000)
public void timingCheck() { public void timingCheck() {

View File

@ -2,6 +2,7 @@ package tech.powerjob.server.core.scheduler;
import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
@ -62,7 +63,7 @@ public class CleanService {
private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
@Async("omsTimingPool") @Async(PJThreadPool.TIMING_POOL)
@Scheduled(cron = CLEAN_TIME_EXPRESSION) @Scheduled(cron = CLEAN_TIME_EXPRESSION)
public void timingClean() { public void timingClean() {

View File

@ -4,6 +4,7 @@ import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.remote.transport.starter.AkkaStarter; import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.persistence.remote.model.*; import tech.powerjob.server.persistence.remote.model.*;
@ -59,7 +60,7 @@ public class InstanceStatusCheckService {
@Resource @Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Async("omsTimingPool") @Async(PJThreadPool.TIMING_POOL)
@Scheduled(fixedDelay = 10000) @Scheduled(fixedDelay = 10000)
public void timingStatusCheck() { public void timingStatusCheck() {
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();

View File

@ -3,6 +3,7 @@ package tech.powerjob.server.core.scheduler;
import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.remote.transport.starter.AkkaStarter; import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.persistence.remote.model.AppInfoDO; import tech.powerjob.server.persistence.remote.model.AppInfoDO;
@ -73,7 +74,7 @@ public class PowerScheduleService {
private static final long SCHEDULE_RATE = 15000; private static final long SCHEDULE_RATE = 15000;
@Async("omsTimingPool") @Async(PJThreadPool.TIMING_POOL)
@Scheduled(fixedDelay = SCHEDULE_RATE) @Scheduled(fixedDelay = SCHEDULE_RATE)
public void timingSchedule() { public void timingSchedule() {

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.config; package tech.powerjob.server.config;
import org.springframework.core.task.TaskExecutor;
import tech.powerjob.server.common.RejectedExecutionHandlerFactory; import tech.powerjob.server.common.RejectedExecutionHandlerFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -8,14 +9,12 @@ import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import tech.powerjob.server.common.constants.PJThreadPool;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* 公用线程池配置 * 公用线程池配置
* omsTimingPool用于执行定时任务的线程池
* omsBackgroundPool用于执行后台任务的线程池这类任务对时间不敏感慢慢执行细水长流即可
* taskScheduler用于定时调度的线程池
* *
* @author tjq * @author tjq
* @since 2020/4/28 * @since 2020/4/28
@ -25,7 +24,7 @@ import java.util.concurrent.*;
@Configuration @Configuration
public class ThreadPoolConfig { public class ThreadPoolConfig {
@Bean("omsTimingPool") @Bean(PJThreadPool.TIMING_POOL)
public Executor getTimingPool() { public Executor getTimingPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
@ -33,31 +32,31 @@ public class ThreadPoolConfig {
// use SynchronousQueue // use SynchronousQueue
executor.setQueueCapacity(0); executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60); executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsTimingPool-"); executor.setThreadNamePrefix("PJ-TIMING-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PJ-TIMING")); executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun(PJThreadPool.TIMING_POOL));
return executor; return executor;
} }
@Bean("omsBackgroundPool") @Bean(PJThreadPool.BACKGROUND_POOL)
public Executor initBackgroundPool() { public Executor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16);
executor.setQueueCapacity(8192); executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60); executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-"); executor.setThreadNamePrefix("PJ-BG-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard("PJ-BACKGROUND")); executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard(PJThreadPool.BACKGROUND_POOL));
return executor; return executor;
} }
@Bean("omsLocalDbPool") @Bean(PJThreadPool.LOCAL_DB_POOL)
public Executor initOmsLocalDbPool() { public Executor initOmsLocalDbPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() / 2); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() / 2);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() / 2); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() / 2);
executor.setQueueCapacity(2048); executor.setQueueCapacity(2048);
executor.setThreadNamePrefix("omsLocalDbPool-"); executor.setThreadNamePrefix("PJ-LOCALDB-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort("PJ-LOCAL-DB")); executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort(PJThreadPool.LOCAL_DB_POOL));
return executor; return executor;
} }