feat: Supports selection of TaskTracker nodes for execution

This commit is contained in:
tjq 2024-02-24 19:59:50 +08:00
parent c717fd3fb8
commit 815d44ef7e
16 changed files with 331 additions and 20 deletions

View File

@ -13,8 +13,19 @@ import lombok.Getter;
@AllArgsConstructor
public enum DispatchStrategy {
/**
* 健康度优先
*/
HEALTH_FIRST(1),
RANDOM(2);
/**
* 随机
*/
RANDOM(2),
/**
* 指定执行
*/
SPECIFY(11)
;
private final int v;

View File

@ -134,6 +134,11 @@ public class SaveJobInfoRequest {
private DispatchStrategy dispatchStrategy;
/**
* 某种派发策略背后的具体配置值取决于 dispatchStrategy
*/
private String dispatchStrategyConfig;
private LifeCycle lifeCycle;
/**
* alarm config

View File

@ -121,7 +121,14 @@ public class JobInfoDTO {
private String extra;
/**
* 派发策略
*/
private Integer dispatchStrategy;
/**
* 某种派发策略背后的具体配置值取决于 dispatchStrategy
*/
private String dispatchStrategyConfig;
private String lifecycle;

View File

@ -26,6 +26,7 @@ import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository
import tech.powerjob.server.remote.transporter.TransportService;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import tech.powerjob.server.remote.worker.selector.TaskTrackerSelectorService;
import java.util.ArrayList;
import java.util.Date;
@ -58,6 +59,8 @@ public class DispatchService {
private final InstanceInfoRepository instanceInfoRepository;
private final TaskTrackerSelectorService taskTrackerSelectorService;
/**
* 异步重新派发
*
@ -145,7 +148,7 @@ public class DispatchService {
}
// 获取当前最合适的 worker 列表
List<WorkerInfo> suitableWorkers = workerClusterQueryService.getSuitableWorkers(jobInfo);
List<WorkerInfo> suitableWorkers = workerClusterQueryService.geAvailableWorkers(jobInfo);
if (CollectionUtils.isEmpty(suitableWorkers)) {
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId);
@ -167,7 +170,7 @@ public class DispatchService {
ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);
// 发送请求不可靠需要一个后台线程定期轮询状态
WorkerInfo taskTracker = suitableWorkers.get(0);
WorkerInfo taskTracker = taskTrackerSelectorService.select(jobInfo, instanceInfo, suitableWorkers);
String taskTrackerAddress = taskTracker.getAddress();
URL workerUrl = ServerURLFactory.dispatchJob2Worker(taskTrackerAddress);

View File

@ -134,7 +134,7 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
if (!jobInfo.getAppId().equals(appId)) {
askResponse = AskResponse.failed("Permission Denied!");
}else {
List<String> sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo)
List<String> sortedAvailableWorker = workerClusterQueryService.geAvailableWorkers(jobInfo)
.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
askResponse = AskResponse.succeed(sortedAvailableWorker);
}

View File

@ -139,7 +139,14 @@ public class JobInfoDO {
*/
private String extra;
/**
* 派发策略
*/
private Integer dispatchStrategy;
/**
* 某种派发策略背后的具体配置值取决于 dispatchStrategy
*/
private String dispatchStrategyConfig;
private String lifecycle;
/**

View File

@ -3,12 +3,11 @@ package tech.powerjob.server.remote.worker;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.remote.worker.filter.WorkerFilter;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.remote.worker.filter.WorkerFilter;
import java.util.Collections;
import java.util.List;
@ -37,24 +36,13 @@ public class WorkerClusterQueryService {
* @param jobInfo job
* @return worker cluster info, sorted by metrics desc
*/
public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {
public List<WorkerInfo> geAvailableWorkers(JobInfoDO jobInfo) {
List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());
// 过滤不符合要求的机器
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
switch (dispatchStrategy) {
case RANDOM:
Collections.shuffle(workers);
break;
case HEALTH_FIRST:
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
break;
default:
// do nothing
}
// 限定集群大小0代表不限制
if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
workers = workers.subList(0, jobInfo.getMaxWorkerCount());

View File

@ -0,0 +1,32 @@
package tech.powerjob.server.remote.worker.selector;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import java.util.List;
/**
* 主节点选择方式
*
* @author tjq
* @since 2024/2/24
*/
public interface TaskTrackerSelector {
/**
* 支持的策略
* @return 派发策略
*/
DispatchStrategy strategy();
/**
* 选择主节点
* @param jobInfoDO 任务信息
* @param instanceInfoDO 任务实例
* @param availableWorkers 可用 workers
* @return 主节点 worker
*/
WorkerInfo select(JobInfoDO jobInfoDO, InstanceInfoDO instanceInfoDO, List<WorkerInfo> availableWorkers);
}

View File

@ -0,0 +1,33 @@
package tech.powerjob.server.remote.worker.selector;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import java.util.List;
import java.util.Map;
/**
* TaskTrackerSelectorService
*
* @author tjq
* @since 2024/2/24
*/
@Service
public class TaskTrackerSelectorService {
private final Map<Integer, TaskTrackerSelector> taskTrackerSelectorMap = Maps.newHashMap();
@Autowired
public TaskTrackerSelectorService(List<TaskTrackerSelector> taskTrackerSelectors) {
taskTrackerSelectors.forEach(ts -> taskTrackerSelectorMap.put(ts.strategy().getV(), ts));
}
public WorkerInfo select(JobInfoDO jobInfoDO, InstanceInfoDO instanceInfoDO, List<WorkerInfo> availableWorkers) {
TaskTrackerSelector taskTrackerSelector = taskTrackerSelectorMap.get(jobInfoDO.getDispatchStrategy());
return taskTrackerSelector.select(jobInfoDO, instanceInfoDO, availableWorkers);
}
}

View File

@ -0,0 +1,33 @@
package tech.powerjob.server.remote.worker.selector.impl;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.remote.worker.selector.TaskTrackerSelector;
import java.util.List;
/**
* HealthFirst
*
* @author 疑似新冠帕鲁
* @since 2024/2/24
*/
@Component
public class HealthFirstTaskTrackerSelector implements TaskTrackerSelector {
@Override
public DispatchStrategy strategy() {
return DispatchStrategy.HEALTH_FIRST;
}
@Override
public WorkerInfo select(JobInfoDO jobInfoDO, InstanceInfoDO instanceInfoDO, List<WorkerInfo> availableWorkers) {
List<WorkerInfo> workers = Lists.newArrayList(availableWorkers);
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
return workers.get(0);
}
}

View File

@ -0,0 +1,32 @@
package tech.powerjob.server.remote.worker.selector.impl;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.remote.worker.selector.TaskTrackerSelector;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* RANDOM
*
* @author 疑似新冠帕鲁
* @since 2024/2/24
*/
@Component
public class RandomTaskTrackerSelector implements TaskTrackerSelector {
@Override
public DispatchStrategy strategy() {
return DispatchStrategy.RANDOM;
}
@Override
public WorkerInfo select(JobInfoDO jobInfoDO, InstanceInfoDO instanceInfoDO, List<WorkerInfo> availableWorkers) {
int randomIdx = ThreadLocalRandom.current().nextInt(availableWorkers.size());
return availableWorkers.get(randomIdx);
}
}

View File

@ -0,0 +1,59 @@
package tech.powerjob.server.remote.worker.selector.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.DispatchStrategy;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.remote.worker.selector.TaskTrackerSelector;
import tech.powerjob.server.remote.worker.utils.SpecifyUtils;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* 指定工作的主节点大规模运算需要隔离主节点以防止 worker 部署打断整体的任务执行
*
* @author tjq
* @since 2024/2/24
*/
@Slf4j
@Component
public class SpecifyTaskTrackerSelector implements TaskTrackerSelector {
@Override
public DispatchStrategy strategy() {
return DispatchStrategy.SPECIFY;
}
@Override
public WorkerInfo select(JobInfoDO jobInfoDO, InstanceInfoDO instanceInfoDO, List<WorkerInfo> availableWorkers) {
String dispatchStrategyConfig = jobInfoDO.getDispatchStrategyConfig();
// 降级到随机
if (StringUtils.isEmpty(dispatchStrategyConfig)) {
log.warn("[SpecifyTaskTrackerSelector] job[id={}]'s dispatchStrategyConfig is empty, use random as bottom DispatchStrategy!", jobInfoDO.getId());
return availableWorkers.get(ThreadLocalRandom.current().nextInt(availableWorkers.size()));
}
List<WorkerInfo> targetWorkers = Lists.newArrayList();
availableWorkers.forEach(aw -> {
boolean match = SpecifyUtils.match(aw, dispatchStrategyConfig);
if (match) {
targetWorkers.add(aw);
}
});
if (CollectionUtils.isEmpty(targetWorkers)) {
log.warn("[SpecifyTaskTrackerSelector] Unable to find available nodes based on conditions for job(id={},dispatchStrategyConfig={}), use random as bottom DispatchStrategy!", jobInfoDO.getId(), dispatchStrategyConfig);
return availableWorkers.get(ThreadLocalRandom.current().nextInt(availableWorkers.size()));
}
// 如果有多个 worker 符合条件最终还是随机选择出一个
return targetWorkers.get(ThreadLocalRandom.current().nextInt(targetWorkers.size()));
}
}

View File

@ -0,0 +1,51 @@
package tech.powerjob.server.remote.worker.utils;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.common.module.WorkerInfo;
import java.util.Optional;
import java.util.Set;
/**
* 指定工具
*
* @author tjq
* @since 2024/2/24
*/
public class SpecifyUtils {
private static final String TAG_EQUALS = "tagEquals:";
private static final String TAG_IN = "tagIn:";
public static boolean match(WorkerInfo workerInfo, String specifyInfo) {
String workerTag = workerInfo.getTag();
// tagIn 语法worker 可上报多个tag WorkerInfo#tag=tag1,tag2,tag3配置中指定 tagIn=tag1 即可命中
if (specifyInfo.startsWith(TAG_IN)) {
String targetTag = specifyInfo.replace(TAG_IN, StringUtils.EMPTY);
return Optional.ofNullable(workerTag).orElse(StringUtils.EMPTY).contains(targetTag);
}
// tagEquals 语法字符串完全匹配worker 只可上报一个 tag WorkerInfo#tag=tag1配置中指定 tagEquals=tag1 即可命中
if (specifyInfo.startsWith(TAG_EQUALS)) {
String targetTag = specifyInfo.replace(TAG_EQUALS, StringUtils.EMPTY);
return Optional.ofNullable(workerTag).orElse(StringUtils.EMPTY).equals(targetTag);
}
// 默认情况IP tag 逗号分割后任意完全匹配即视为命中兼容 4.3.8 版本前序逻辑
Set<String> designatedWorkersSet = Sets.newHashSet(SJ.COMMA_SPLITTER.splitToList(specifyInfo));
for (String tagOrAddress : designatedWorkersSet) {
if (tagOrAddress.equals(workerInfo.getTag()) || tagOrAddress.equals(workerInfo.getAddress())) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,45 @@
package tech.powerjob.server.remote.worker.utils;
import org.junit.jupiter.api.Test;
import tech.powerjob.server.common.module.WorkerInfo;
import static org.junit.jupiter.api.Assertions.*;
/**
* SpecifyUtilsTest
*
* @author tjq
* @since 2024/2/24
*/
class SpecifyUtilsTest {
@Test
void match() {
WorkerInfo workerInfo = new WorkerInfo();
workerInfo.setAddress("192.168.1.1");
workerInfo.setTag("tag1");
assert SpecifyUtils.match(workerInfo, "192.168.1.1");
assert SpecifyUtils.match(workerInfo, "192.168.1.1,192.168.1.2,192.168.1.3,192.168.1.4");
assert !SpecifyUtils.match(workerInfo, "172.168.1.1");
assert !SpecifyUtils.match(workerInfo, "172.168.1.1,172.168.1.2,172.168.1.3");
assert SpecifyUtils.match(workerInfo, "tag1");
assert SpecifyUtils.match(workerInfo, "tag1,tag2");
assert !SpecifyUtils.match(workerInfo, "t1");
assert !SpecifyUtils.match(workerInfo, "t1,t2");
assert SpecifyUtils.match(workerInfo, "tagIn:tag1");
assert !SpecifyUtils.match(workerInfo, "tagIn:tag2");
assert SpecifyUtils.match(workerInfo, "tagEquals:tag1");
assert !SpecifyUtils.match(workerInfo, "tagEquals:tag2");
workerInfo.setTag("tag1,tag2,tag3");
assert SpecifyUtils.match(workerInfo, "tagIn:tag1");
assert SpecifyUtils.match(workerInfo, "tagIn:tag3");
assert !SpecifyUtils.match(workerInfo, "tagIn:tag99");
}
}

View File

@ -142,6 +142,11 @@ public class JobInfoVO {
private String dispatchStrategy;
/**
* 某种派发策略背后的具体配置值取决于 dispatchStrategy
*/
private String dispatchStrategyConfig;
private LifeCycle lifeCycle;
private AlarmConfig alarmConfig;

View File

@ -17,7 +17,7 @@ import java.util.Collections;
* @since 2020/4/17
*/
@Slf4j
@Component
@Component("testBaseProcessor")
public class StandaloneProcessorDemo implements BasicProcessor {
@Override