diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index f4909ae3..1dd980d0 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -340,13 +340,16 @@ public class ProcessorTracker { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); - if (executeType == ExecuteType.MAP_REDUCE) { + if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { return instanceInfo.getThreadConcurrency(); } // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程 if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) { return 1; } + if (TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) { + return instanceInfo.getThreadConcurrency(); + } return 2; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index a50029b3..4cef1603 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -173,7 +173,7 @@ public class FrequentTaskTracker extends TaskTracker { // 判断是否超出最大执行实例数 if (maxInstanceNum > 0) { if (timeExpressionType == TimeExpressionType.FIXED_RATE) { - if (subInstanceId2TimeHolder.size() > maxInstanceNum) { + if (subInstanceId2TimeHolder.size() >= maxInstanceNum) { log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE"); return;