From bd9224a805eefc8ba59434e649a6dd28017de707 Mon Sep 17 00:00:00 2001 From: tjq Date: Mon, 22 Feb 2021 00:46:11 +0800 Subject: [PATCH] feat: support random dispatch strategy #205 --- .../request/http/SaveJobInfoRequest.java | 6 ++++ .../common/request/query/JobInfoQuery.java | 1 + .../powerjob/common/response/JobInfoDTO.java | 6 ++++ .../common/constans/DispatchStrategy.java | 32 +++++++++++++++++++ .../persistence/core/model/JobInfoDO.java | 4 +++ .../server/remote/DispatchService.java | 16 +++++++++- .../powerjob/server/service/JobService.java | 2 ++ .../server/web/response/JobInfoVO.java | 9 ++++++ 8 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/DispatchStrategy.java diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java index 2f2e25a0..61199ae5 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java @@ -125,6 +125,12 @@ public class SaveJobInfoRequest { */ private List notifyUserIds; + private String extra; + + private Integer dispatchStrategy; + + private String lifecycle; + /** * Check non-null properties. diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/query/JobInfoQuery.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/query/JobInfoQuery.java index 754fbd71..c5b8e3e1 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/query/JobInfoQuery.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/query/JobInfoQuery.java @@ -49,4 +49,5 @@ public class JobInfoQuery extends PowerQuery { private Date gmtModifiedLt; private Date gmtModifiedGt; + private Integer dispatchStrategyEq; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/JobInfoDTO.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/JobInfoDTO.java index 68a53822..3dcca706 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/JobInfoDTO.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/JobInfoDTO.java @@ -75,4 +75,10 @@ public class JobInfoDTO { private Date gmtCreate; private Date gmtModified; + + private String extra; + + private Integer dispatchStrategy; + + private String lifecycle; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/DispatchStrategy.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/DispatchStrategy.java new file mode 100644 index 00000000..3fb5b2f4 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/DispatchStrategy.java @@ -0,0 +1,32 @@ +package com.github.kfcfans.powerjob.server.common.constans; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * DispatchStrategy + * + * @author tjq + * @since 2021/2/22 + */ +@Getter +@AllArgsConstructor +public enum DispatchStrategy { + + HEALTH_FIRST(1), + RANDOM(2); + + private final int v; + + public static DispatchStrategy of(Integer v) { + if (v == null) { + return HEALTH_FIRST; + } + for (DispatchStrategy ds : values()) { + if (v.equals(ds.v)) { + return ds; + } + } + throw new IllegalArgumentException("unknown DispatchStrategy of " + v); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java index bec5eec0..acb58b1c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java @@ -137,4 +137,8 @@ public class JobInfoDO { */ private String extra; + private Integer dispatchStrategy; + + private String lifecycle; + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java index 356b699a..d24ea007 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.remote; import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; +import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; @@ -21,6 +22,7 @@ import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.Date; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import static com.github.kfcfans.powerjob.common.InstanceStatus.*; @@ -136,7 +138,7 @@ public class DispatchService { // 发送请求(不可靠,需要一个后台线程定期轮询状态) - WorkerInfo taskTracker = suitableWorkers.get(0); + WorkerInfo taskTracker = selectTaskTracker(jobInfo, suitableWorkers); String taskTrackerAddress = taskTracker.getAddress(); transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req); @@ -182,4 +184,16 @@ public class DispatchService { req.setThreadConcurrency(jobInfo.getConcurrency()); return req; } + + private WorkerInfo selectTaskTracker(JobInfoDO jobInfo, List workerInfos) { + DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy()); + switch (dispatchStrategy) { + case HEALTH_FIRST: + return workerInfos.get(0); + case RANDOM: + return workerInfos.get(ThreadLocalRandom.current().nextInt(workerInfos.size())); + } + // impossible, indian java + return workerInfos.get(0); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 05d54d75..4fe4372c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -7,6 +7,7 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; import com.github.kfcfans.powerjob.server.common.SJ; +import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; @@ -78,6 +79,7 @@ public class JobService { jobInfoDO.setProcessorType(request.getProcessorType().getV()); jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV()); jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); + jobInfoDO.setDispatchStrategy(DispatchStrategy.of(request.getDispatchStrategy()).getV()); // 填充默认值,非空保护防止 NPE fillDefaultValue(jobInfoDO); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java index 7145ae26..ee0ed645 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java @@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.server.common.SJ; +import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.google.common.collect.Lists; @@ -89,6 +90,12 @@ public class JobInfoVO { // 报警用户ID列表 private List notifyUserIds; + private String extra; + + private String dispatchStrategy; + + private String lifecycle; + public static JobInfoVO from(JobInfoDO jobInfoDO) { JobInfoVO jobInfoVO = new JobInfoVO(); BeanUtils.copyProperties(jobInfoDO, jobInfoVO); @@ -96,11 +103,13 @@ public class JobInfoVO { TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); ExecuteType executeType = ExecuteType.of(jobInfoDO.getExecuteType()); ProcessorType processorType = ProcessorType.of(jobInfoDO.getProcessorType()); + DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfoDO.getDispatchStrategy()); jobInfoVO.setTimeExpressionType(timeExpressionType.name()); jobInfoVO.setExecuteType(executeType.name()); jobInfoVO.setProcessorType(processorType.name()); jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV()); + jobInfoVO.setDispatchStrategy(dispatchStrategy.name()); if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) { jobInfoVO.setNotifyUserIds(SJ.commaSplitter.splitToList(jobInfoDO.getNotifyUserIds()));