From 3cb9ddf31f5649c6b97ea1cba948a334dd20906d Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 16 Jan 2021 22:56:40 +0800 Subject: [PATCH] fix: he bug of calculate thread pool size in ProcessorTracker --- .../core/tracker/processor/ProcessorTracker.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 1dd980d0..c7ed24aa 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -198,8 +198,8 @@ public class ProcessorTracker { int poolSize = calThreadPoolSize(); // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大 BlockingQueue queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE); - // 自定义线程池中线程名称 - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build(); + // 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP) + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build(); // 拒绝策略:直接抛出异常 RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); @@ -214,8 +214,8 @@ public class ProcessorTracker { */ private void initTimingJob() { - // 全称 oms-ProcessTracker-TimingPool - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-ProcessorTrackerTimingPool-%d").build(); + // PowerJob Processor TimingPool + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build(); timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS); @@ -340,13 +340,14 @@ public class ProcessorTracker { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); - if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { - return instanceInfo.getThreadConcurrency(); - } // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程 if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) { return 1; } + + if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { + return instanceInfo.getThreadConcurrency(); + } if (TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) { return instanceInfo.getThreadConcurrency(); }