feat: support random dispatch strategy #205

This commit is contained in:
tjq 2021-02-22 00:46:11 +08:00
parent 823a47303b
commit bd9224a805
8 changed files with 75 additions and 1 deletions

View File

@ -125,6 +125,12 @@ public class SaveJobInfoRequest {
*/ */
private List<Long> notifyUserIds; private List<Long> notifyUserIds;
private String extra;
private Integer dispatchStrategy;
private String lifecycle;
/** /**
* Check non-null properties. * Check non-null properties.

View File

@ -49,4 +49,5 @@ public class JobInfoQuery extends PowerQuery {
private Date gmtModifiedLt; private Date gmtModifiedLt;
private Date gmtModifiedGt; private Date gmtModifiedGt;
private Integer dispatchStrategyEq;
} }

View File

@ -75,4 +75,10 @@ public class JobInfoDTO {
private Date gmtCreate; private Date gmtCreate;
private Date gmtModified; private Date gmtModified;
private String extra;
private Integer dispatchStrategy;
private String lifecycle;
} }

View File

@ -0,0 +1,32 @@
package com.github.kfcfans.powerjob.server.common.constans;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* DispatchStrategy
*
* @author tjq
* @since 2021/2/22
*/
@Getter
@AllArgsConstructor
public enum DispatchStrategy {
HEALTH_FIRST(1),
RANDOM(2);
private final int v;
public static DispatchStrategy of(Integer v) {
if (v == null) {
return HEALTH_FIRST;
}
for (DispatchStrategy ds : values()) {
if (v.equals(ds.v)) {
return ds;
}
}
throw new IllegalArgumentException("unknown DispatchStrategy of " + v);
}
}

View File

@ -137,4 +137,8 @@ public class JobInfoDO {
*/ */
private String extra; private String extra;
private Integer dispatchStrategy;
private String lifecycle;
} }

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.remote;
import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
@ -21,6 +22,7 @@ import org.springframework.util.StringUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.github.kfcfans.powerjob.common.InstanceStatus.*; import static com.github.kfcfans.powerjob.common.InstanceStatus.*;
@ -136,7 +138,7 @@ public class DispatchService {
// 发送请求不可靠需要一个后台线程定期轮询状态 // 发送请求不可靠需要一个后台线程定期轮询状态
WorkerInfo taskTracker = suitableWorkers.get(0); WorkerInfo taskTracker = selectTaskTracker(jobInfo, suitableWorkers);
String taskTrackerAddress = taskTracker.getAddress(); String taskTrackerAddress = taskTracker.getAddress();
transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req); transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
@ -182,4 +184,16 @@ public class DispatchService {
req.setThreadConcurrency(jobInfo.getConcurrency()); req.setThreadConcurrency(jobInfo.getConcurrency());
return req; return req;
} }
private WorkerInfo selectTaskTracker(JobInfoDO jobInfo, List<WorkerInfo> workerInfos) {
DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
switch (dispatchStrategy) {
case HEALTH_FIRST:
return workerInfos.get(0);
case RANDOM:
return workerInfos.get(ThreadLocalRandom.current().nextInt(workerInfos.size()));
}
// impossible, indian java
return workerInfos.get(0);
}
} }

View File

@ -7,6 +7,7 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO; import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
@ -78,6 +79,7 @@ public class JobService {
jobInfoDO.setProcessorType(request.getProcessorType().getV()); jobInfoDO.setProcessorType(request.getProcessorType().getV());
jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV()); jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV());
jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
jobInfoDO.setDispatchStrategy(DispatchStrategy.of(request.getDispatchStrategy()).getV());
// 填充默认值非空保护防止 NPE // 填充默认值非空保护防止 NPE
fillDefaultValue(jobInfoDO); fillDefaultValue(jobInfoDO);

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.DispatchStrategy;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -89,6 +90,12 @@ public class JobInfoVO {
// 报警用户ID列表 // 报警用户ID列表
private List<String> notifyUserIds; private List<String> notifyUserIds;
private String extra;
private String dispatchStrategy;
private String lifecycle;
public static JobInfoVO from(JobInfoDO jobInfoDO) { public static JobInfoVO from(JobInfoDO jobInfoDO) {
JobInfoVO jobInfoVO = new JobInfoVO(); JobInfoVO jobInfoVO = new JobInfoVO();
BeanUtils.copyProperties(jobInfoDO, jobInfoVO); BeanUtils.copyProperties(jobInfoDO, jobInfoVO);
@ -96,11 +103,13 @@ public class JobInfoVO {
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
ExecuteType executeType = ExecuteType.of(jobInfoDO.getExecuteType()); ExecuteType executeType = ExecuteType.of(jobInfoDO.getExecuteType());
ProcessorType processorType = ProcessorType.of(jobInfoDO.getProcessorType()); ProcessorType processorType = ProcessorType.of(jobInfoDO.getProcessorType());
DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfoDO.getDispatchStrategy());
jobInfoVO.setTimeExpressionType(timeExpressionType.name()); jobInfoVO.setTimeExpressionType(timeExpressionType.name());
jobInfoVO.setExecuteType(executeType.name()); jobInfoVO.setExecuteType(executeType.name());
jobInfoVO.setProcessorType(processorType.name()); jobInfoVO.setProcessorType(processorType.name());
jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV()); jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV());
jobInfoVO.setDispatchStrategy(dispatchStrategy.name());
if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) { if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) {
jobInfoVO.setNotifyUserIds(SJ.commaSplitter.splitToList(jobInfoDO.getNotifyUserIds())); jobInfoVO.setNotifyUserIds(SJ.commaSplitter.splitToList(jobInfoDO.getNotifyUserIds()));