From 40d5dfc549c4658b6e728c5160d57bb5dcdced31 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 14 Jul 2020 17:31:47 +0800 Subject: [PATCH] [dev] optimize status check when dispatch job to worker --- .../powerjob/common/InstanceStatus.java | 2 ++ .../request/http/SaveJobInfoRequest.java | 4 ++-- .../server/akka/actors/ServerActor.java | 2 +- .../server/service/DispatchService.java | 20 +++++++++++-------- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java index 7729b75b..749926db 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java @@ -28,6 +28,8 @@ public enum InstanceStatus { // 广义的运行状态 public static final List generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v); + // 结束状态 + public static final List finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v); public static InstanceStatus of(int v) { for (InstanceStatus is : values()) { diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java index ca3b5c2c..ae7a24e2 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java @@ -46,8 +46,8 @@ public class SaveJobInfoRequest { /* ************************** 运行时配置 ************************** */ - // 最大同时运行任务数 - private Integer maxInstanceNum = 1; + // 最大同时运行任务数,0 代表不限 + private Integer maxInstanceNum = 0; // 并发度,同时执行的线程数量 private Integer concurrency = 5; // 任务整体超时时间 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index fc83f9a2..fdd82196 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -63,7 +63,7 @@ public class ServerActor extends AbstractActor { getInstanceManager().updateStatus(req); // 结束状态(成功/失败)需要回复消息 - if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) { + if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) { getSender().tell(AskResponse.succeed(null), getSelf()); } }catch (Exception e) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 7f5200a8..13281259 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -68,16 +68,20 @@ public class DispatchService { // 查询当前运行的实例数 long current = System.currentTimeMillis(); - long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus); - // 超出最大同时运行限制,不执行调度 - if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { - String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); - log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); - instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); + // 0 代表不限制在线任务,还能省去一次 DB 查询 + if (jobInfo.getMaxInstanceNum() > 0) { - instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); - return; + long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); + // 超出最大同时运行限制,不执行调度 + if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { + String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); + log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); + instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); + + instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); + return; + } } // 获取当前所有可用的Worker