fix: he bug of calculate thread pool size in ProcessorTracker

This commit is contained in:
tjq 2021-01-16 22:56:40 +08:00
parent dd66c8fd31
commit 3cb9ddf31f

View File

@ -198,8 +198,8 @@ public class ProcessorTracker {
int poolSize = calThreadPoolSize();
// 待执行队列为了防止对内存造成较大压力内存队列不能太大
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE);
// 自定义线程池中线程名称
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build();
// 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP)
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build();
// 拒绝策略直接抛出异常
RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
@ -214,8 +214,8 @@ public class ProcessorTracker {
*/
private void initTimingJob() {
// 全称 oms-ProcessTracker-TimingPool
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-ProcessorTrackerTimingPool-%d").build();
// PowerJob Processor TimingPool
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build();
timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS);
@ -340,13 +340,14 @@ public class ProcessorTracker {
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());
if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {
return instanceInfo.getThreadConcurrency();
}
// 脚本类自带线程池不过为了少一点逻辑判断还是象征性分配一个线程
if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {
return 1;
}
if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {
return instanceInfo.getThreadConcurrency();
}
if (TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
return instanceInfo.getThreadConcurrency();
}