[dev] optimize status check when dispatch job to worker

This commit is contained in:
tjq 2020-07-14 17:31:47 +08:00
parent 70920a9220
commit 40d5dfc549
4 changed files with 17 additions and 11 deletions

View File

@ -28,6 +28,8 @@ public enum InstanceStatus {
// 广义的运行状态
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
// 结束状态
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
public static InstanceStatus of(int v) {
for (InstanceStatus is : values()) {

View File

@ -46,8 +46,8 @@ public class SaveJobInfoRequest {
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数
private Integer maxInstanceNum = 1;
// 最大同时运行任务数0 代表不限
private Integer maxInstanceNum = 0;
// 并发度同时执行的线程数量
private Integer concurrency = 5;
// 任务整体超时时间

View File

@ -63,7 +63,7 @@ public class ServerActor extends AbstractActor {
getInstanceManager().updateStatus(req);
// 结束状态成功/失败需要回复消息
if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) {
if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) {
getSender().tell(AskResponse.succeed(null), getSelf());
}
}catch (Exception e) {

View File

@ -68,16 +68,20 @@ public class DispatchService {
// 查询当前运行的实例数
long current = System.currentTimeMillis();
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
// 超出最大同时运行限制不执行调度
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
// 0 代表不限制在线任务还能省去一次 DB 查询
if (jobInfo.getMaxInstanceNum() > 0) {
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
return;
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
// 超出最大同时运行限制不执行调度
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
return;
}
}
// 获取当前所有可用的Worker