refactor: optimize thread pool config

This commit is contained in:
tjq 2021-01-02 19:52:22 +08:00
parent 9a661aa177
commit 6a85995937
3 changed files with 5 additions and 5 deletions

View File

@ -28,8 +28,8 @@ public class ThreadPoolConfig {
@Bean("omsTimingPool") @Bean("omsTimingPool")
public Executor getTimingPool() { public Executor getTimingPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 16); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 32); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
// use SynchronousQueue // use SynchronousQueue
executor.setQueueCapacity(0); executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60); executor.setKeepAliveSeconds(60);

View File

@ -60,7 +60,7 @@ public class InstanceStatusCheckService {
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Async("omsTimingPool") @Async("omsTimingPool")
@Scheduled(fixedRate = 10000) @Scheduled(fixedDelay = 10000)
public void timingStatusCheck() { public void timingStatusCheck() {
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();
@ -115,7 +115,7 @@ public class InstanceStatusCheckService {
threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
List<InstanceInfoDO> waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold); List<InstanceInfoDO> waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) { if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) {
log.warn("[InstanceStatusChecker] instances({}) didn't receive any reply from worker.", waitingWorkerReceiveInstances); log.warn("[InstanceStatusChecker] find one instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances);
waitingWorkerReceiveInstances.forEach(instance -> { waitingWorkerReceiveInstances.forEach(instance -> {
// 重新派发 // 重新派发
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);

View File

@ -133,7 +133,7 @@ public class OmsScheduleService {
// 1. 批量写日志表 // 1. 批量写日志表
Map<Long, Long> jobId2InstanceId = Maps.newHashMap(); Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
log.info("[CronScheduler] These cron jobs will be scheduled {}.", jobInfos); log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos);
jobInfos.forEach(jobInfo -> { jobInfos.forEach(jobInfo -> {
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), null, null, jobInfo.getNextTriggerTime()); Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), null, null, jobInfo.getNextTriggerTime());