From 62aeb9c08037e6fc1d753a6e3dfe728dea096f6b Mon Sep 17 00:00:00 2001 From: ocean23 <43363687@qq.com> Date: Sat, 16 Jan 2021 16:48:14 +0800 Subject: [PATCH 1/2] fix: solved fixed Rate(Delay) concurrency #174 --- .../worker/core/tracker/processor/ProcessorTracker.java | 7 ++++++- .../worker/core/tracker/task/FrequentTaskTracker.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index f4909ae3..b55fb163 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -340,7 +340,12 @@ public class ProcessorTracker { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); - if (executeType == ExecuteType.MAP_REDUCE) { + if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { + return instanceInfo.getThreadConcurrency(); + } + boolean exeStandalone = executeType == ExecuteType.STANDALONE; + boolean fixedJob = TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType()); + if (exeStandalone && fixedJob) { return instanceInfo.getThreadConcurrency(); } // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index a50029b3..4cef1603 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -173,7 +173,7 @@ public class FrequentTaskTracker extends TaskTracker { // 判断是否超出最大执行实例数 if (maxInstanceNum > 0) { if (timeExpressionType == TimeExpressionType.FIXED_RATE) { - if (subInstanceId2TimeHolder.size() > maxInstanceNum) { + if (subInstanceId2TimeHolder.size() >= maxInstanceNum) { log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE"); return; From e429c6f59e31cb217b305b69ce26adae5254e8fd Mon Sep 17 00:00:00 2001 From: ocean23 <43363687@qq.com> Date: Sat, 16 Jan 2021 17:14:10 +0800 Subject: [PATCH 2/2] fix: modify by suggestion --- .../worker/core/tracker/processor/ProcessorTracker.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index b55fb163..1dd980d0 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -343,15 +343,13 @@ public class ProcessorTracker { if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { return instanceInfo.getThreadConcurrency(); } - boolean exeStandalone = executeType == ExecuteType.STANDALONE; - boolean fixedJob = TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType()); - if (exeStandalone && fixedJob) { - return instanceInfo.getThreadConcurrency(); - } // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程 if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) { return 1; } + if (TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) { + return instanceInfo.getThreadConcurrency(); + } return 2; }