refactor: optimize DispatchService's worker filter code #215

This commit is contained in:
tjq 2021-02-19 20:52:52 +08:00
parent eac7ce7b27
commit daf42be5cb
17 changed files with 180 additions and 128 deletions

View File

@ -131,4 +131,10 @@ public class JobInfoDO {
private Date gmtModified;
/**
* 扩展参数PowerJob 自身不会使用该数据留给开发者扩展时使用
* 比如 WorkerFilter 的自定义 worker 过滤逻辑可在次传入过滤指标 GpuUsage < 10
*/
private String extra;
}

View File

@ -1,19 +1,17 @@
package com.github.kfcfans.powerjob.server.service;
package com.github.kfcfans.powerjob.server.remote;
import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService;
import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@ -23,7 +21,6 @@ import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static com.github.kfcfans.powerjob.common.InstanceStatus.*;
@ -42,6 +39,8 @@ public class DispatchService {
@Resource
private TransportService transportService;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
@Resource
private InstanceManager instanceManager;
@ -50,7 +49,6 @@ public class DispatchService {
@Resource
private InstanceInfoRepository instanceInfoRepository;
private static final Splitter COMMA_SPLITTER = Splitter.on(",");
/**
* 重新派发任务
@ -120,12 +118,12 @@ public class DispatchService {
return;
}
}
// 获取当前最合适的 worker 列表
List<WorkerInfo> suitableWorkers = obtainSuitableWorkers(jobInfo);
List<WorkerInfo> suitableWorkers = workerClusterQueryService.getSuitableWorkers(jobInfo);
if (CollectionUtils.isEmpty(suitableWorkers)) {
String clusterStatusDescription = WorkerClusterManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId());
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription);
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, now);
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
@ -151,32 +149,6 @@ public class DispatchService {
instanceMetadataService.loadJobInfo(instanceId, jobInfo);
}
/**
* 获取当前最合适的 worker 列表
*/
private List<WorkerInfo> obtainSuitableWorkers(JobInfoDO jobInfo) {
// 获取当前所有可用的Worker
List<WorkerInfo> allAvailableWorker = WorkerClusterManagerService.getSortedAvailableWorkers(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
// 筛选指定的机器
allAvailableWorker.removeIf(worker -> {
// 则全部不过滤
if (StringUtils.isEmpty(jobInfo.getDesignatedWorkers())) {
return false;
}
// 非空只有匹配上的 worker 才不被过滤
Set<String> designatedWorkers = Sets.newHashSet(COMMA_SPLITTER.splitToList(jobInfo.getDesignatedWorkers()));
return !designatedWorkers.contains(worker.getAddress());
});
// 限定集群大小0代表不限制
if (!allAvailableWorker.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && allAvailableWorker.size() > jobInfo.getMaxWorkerCount()) {
allAvailableWorker = allAvailableWorker.subList(0, jobInfo.getMaxWorkerCount());
}
return allAvailableWorker;
}
/**
* 构造任务调度请求
*/

View File

@ -1,8 +1,6 @@
package com.github.kfcfans.powerjob.server.remote.worker.cluster;
import com.alibaba.fastjson.JSON;
import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -12,7 +10,6 @@ import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* 管理Worker集群状态
@ -30,7 +27,6 @@ public class ClusterStatusHolder {
// 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo)
private Map<Long, Map<String, DeployedContainerInfo>> containerId2Infos;
private static final long WORKER_TIMEOUT_MS = 60000;
public ClusterStatusHolder(String appName) {
this.appName = appName;
@ -68,53 +64,14 @@ public class ClusterStatusHolder {
}
}
/**
* 获取当前所有可用的 Worker
* @param minCPUCores 最低CPU核心数量
* @param minMemorySpace 最低内存可用空间单位GB
* @param minDiskSpace 最低磁盘可用空间单位GB
* @return List<Worker>
*/
public List<WorkerInfo> getSortedAvailableWorkers(double minCPUCores, double minMemorySpace, double minDiskSpace) {
List<WorkerInfo> workers = getAvailableWorkers(minCPUCores, minMemorySpace, minDiskSpace);
// 按机器健康度排序
workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
return workers;
public List<WorkerInfo> getAllWorkers() {
return Lists.newLinkedList(address2WorkerInfo.values());
}
public WorkerInfo getWorkerInfo(String address) {
return address2WorkerInfo.get(address);
}
public List<WorkerInfo> getAvailableWorkers(double minCPUCores, double minMemorySpace, double minDiskSpace) {
List<WorkerInfo> workerInfos = Lists.newArrayList();
address2WorkerInfo.forEach((address, workerInfo) -> {
if (timeout(address)) {
log.info("[ClusterStatusHolder] worker(address={},metrics={}) was filtered because of timeout, last active time is {}.", address, workerInfo.getSystemMetrics(), workerInfo.getLastActiveTime());
return;
}
// 判断指标
SystemMetrics metrics = workerInfo.getSystemMetrics();
if (metrics.available(minCPUCores, minMemorySpace, minDiskSpace)) {
workerInfos.add(workerInfo);
}else {
log.info("[ClusterStatusHolder] worker(address={},metrics={}) was filtered by config(minCPUCores={},minMemory={},minDiskSpace={})", address, metrics, minCPUCores, minMemorySpace, minDiskSpace);
}
});
return workerInfos;
}
/**
* 获取整个集群的简介
* @return 获取集群简介
*/
public String getClusterDescription() {
return String.format("appName:%s,clusterStatus:%s", appName, JSON.toJSONString(address2WorkerInfo));
}
/**
* 获取当前连接的的机器详情
* @return map
@ -122,7 +79,7 @@ public class ClusterStatusHolder {
public Map<String, WorkerInfo> getActiveWorkerInfo() {
Map<String, WorkerInfo> res = Maps.newHashMap();
address2WorkerInfo.forEach((address, workerInfo) -> {
if (!timeout(address)) {
if (!workerInfo.timeout()) {
res.put(address, workerInfo);
}
});
@ -154,7 +111,7 @@ public class ClusterStatusHolder {
// 丢弃超时机器的信息
List<String> timeoutAddress = Lists.newLinkedList();
address2WorkerInfo.forEach((addr, workerInfo) -> {
if (timeout(addr)) {
if (workerInfo.timeout()) {
timeoutAddress.add(addr);
}
});
@ -164,15 +121,4 @@ public class ClusterStatusHolder {
timeoutAddress.forEach(address2WorkerInfo::remove);
}
}
private boolean timeout(String address) {
// 排除超时机器
return Optional.ofNullable(address2WorkerInfo.get(address))
.map(workerInfo -> {
long timeout = System.currentTimeMillis() - workerInfo.getLastActiveTime();
return timeout > WORKER_TIMEOUT_MS;
})
.orElse(true);
}
}

View File

@ -9,7 +9,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.*;
/**
* Worker 管理服务
* 管理 worker 集群信息
*
* @author tjq
* @since 2020/4/5
@ -32,25 +32,19 @@ public class WorkerClusterManagerService {
}
/**
* 获取有序的当前所有可用的Worker地址按得分高低排序排在前面的健康度更高
* 获取某个 app 下所有连接过的机器信息
* @param appId appId
* @return 所有连接过的机器信息列表
*/
public static List<WorkerInfo> getSortedAvailableWorkers(Long appId, double minCPUCores, double minMemorySpace, double minDiskSpace) {
public static List<WorkerInfo> getWorkerInfosByAppId(Long appId) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
return Collections.emptyList();
}
return clusterStatusHolder.getSortedAvailableWorkers(minCPUCores, minMemorySpace, minDiskSpace);
return clusterStatusHolder.getAllWorkers();
}
public static List<WorkerInfo> getAvailableWorkers(Long appId, double minCPUCores, double minMemorySpace, double minDiskSpace) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
return Collections.emptyList();
}
return clusterStatusHolder.getAvailableWorkers(minCPUCores, minMemorySpace, minDiskSpace);
}
public static Optional<WorkerInfo> getWorkerInfo(Long appId, String address) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
@ -70,19 +64,6 @@ public class WorkerClusterManagerService {
appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
}
/**
* 获取某个应用下的Worker集群状态描述
* @param appId 应用ID
* @return 集群状态描述信息
*/
public static String getWorkerClusterStatusDescription(Long appId) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
return "CAN'T_FIND_ANY_WORKER";
}
return clusterStatusHolder.getClusterDescription();
}
/**
* 获取当前连接到该Server的Worker信息
* @param appId 应用ID

View File

@ -0,0 +1,55 @@
package com.github.kfcfans.powerjob.server.remote.worker.cluster;
import com.github.kfcfans.powerjob.server.extension.WorkerFilter;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 获取 worker 集群信息
*
* @author tjq
* @since 2021/2/19
*/
@Service
public class WorkerClusterQueryService {
private List<WorkerFilter> workerFilters;
@Autowired
public WorkerClusterQueryService(List<WorkerFilter> workerFilters) {
this.workerFilters = workerFilters;
}
/**
* get worker for job
* @param jobInfo job
* @return worker cluster info, sorted by metrics desc
*/
public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {
List<WorkerInfo> workers = WorkerClusterManagerService.getWorkerInfosByAppId(jobInfo.getAppId());
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
// 按健康度排序
workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
// 限定集群大小0代表不限制
if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
workers = workers.subList(0, jobInfo.getMaxWorkerCount());
}
return workers;
}
private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) {
for (WorkerFilter filter : workerFilters) {
if (filter.filter(workerInfo, jobInfo)) {
return true;
}
}
return false;
}
}

View File

@ -28,6 +28,8 @@ public class WorkerInfo {
private List<DeployedContainerInfo> containerInfos;
private static final long WORKER_TIMEOUT_MS = 60000;
public void refresh(WorkerHeartbeat workerHeartbeat) {
address = workerHeartbeat.getWorkerAddress();
lastActiveTime = workerHeartbeat.getHeartbeatTime();
@ -36,4 +38,9 @@ public class WorkerInfo {
systemMetrics = workerHeartbeat.getSystemMetrics();
containerInfos = workerHeartbeat.getContainerInfos();
}
public boolean timeout() {
long timeout = System.currentTimeMillis() - lastActiveTime;
return timeout > WORKER_TIMEOUT_MS;
}
}

View File

@ -0,0 +1,37 @@
package com.github.kfcfans.powerjob.server.remote.worker.cluster.filter;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.extension.WorkerFilter;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.Set;
/**
* just use designated worker
*
* @author tjq
* @since 2021/2/19
*/
@Component
public class DesignatedWorkerFilter implements WorkerFilter {
// min length 1.1.1.1:1
private static final int MIN_IP_LENGTH = 9;
@Override
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO) {
String designatedWorkers = jobInfoDO.getDesignatedWorkers();
if (StringUtils.isEmpty(designatedWorkers) || designatedWorkers.length() < MIN_IP_LENGTH) {
return false;
}
Set<String> designatedWorkersSet = Sets.newHashSet(SJ.commaSplitter.splitToList(designatedWorkers));
return !designatedWorkersSet.contains(workerInfo.getAddress());
}
}

View File

@ -0,0 +1,21 @@
package com.github.kfcfans.powerjob.server.remote.worker.cluster.filter;
import com.github.kfcfans.powerjob.server.extension.WorkerFilter;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import org.springframework.stereotype.Component;
/**
* filter disconnected worker
*
* @author tjq
* @since 2021/2/19
*/
@Component
public class DisconnectedWorkerFilter implements WorkerFilter {
@Override
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO) {
return workerInfo.timeout();
}
}

View File

@ -0,0 +1,23 @@
package com.github.kfcfans.powerjob.server.remote.worker.cluster.filter;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.server.extension.WorkerFilter;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import org.springframework.stereotype.Component;
/**
* filter worker by system metric
*
* @author tjq
* @since 2021/2/19
*/
@Component
public class SystemMetricsWorkerFilter implements WorkerFilter {
@Override
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
SystemMetrics metrics = workerInfo.getSystemMetrics();
return !metrics.available(jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.remote.worker.handler;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.common.request.*;
import com.github.kfcfans.powerjob.common.response.AskResponse;
@ -49,6 +50,9 @@ public class WorkerRequestHandler {
@Resource
private ContainerInfoRepository containerInfoRepository;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
private static WorkerRequestHandler workerRequestHandler;
/**
@ -134,7 +138,7 @@ public class WorkerRequestHandler {
if (!jobInfo.getAppId().equals(appId)) {
askResponse = AskResponse.failed("Permission Denied!");
}else {
List<String> sortedAvailableWorker = WorkerClusterManagerService.getAvailableWorkers(appId, jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace())
List<String> sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo)
.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
askResponse = AskResponse.succeed(sortedAvailableWorker);
}

View File

@ -8,6 +8,7 @@ import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils;

View File

@ -8,7 +8,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;

View File

@ -15,7 +15,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService;

View File

@ -8,7 +8,7 @@ import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.persistence.core.model.*;
import com.github.kfcfans.powerjob.server.persistence.core.repository.*;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
import com.google.common.base.Stopwatch;

View File

@ -12,7 +12,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRep
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.service.JobService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;

View File

@ -13,7 +13,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRep
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowNodeInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm;

View File

@ -13,8 +13,7 @@ public interface BasicProcessor {
/**
* 核心处理逻辑
* 可通过 {@link TaskContext#fetchWorkflowContext} 获取工作流上下文
* 可通过 {@link TaskContext#appendData2WfContext} 向工作流上下文中添加数据
* 可通过 {@link TaskContext#workflowContext} 获取工作流上下文
*
* @param context 任务上下文可通过 jobParams instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数
* @return 处理结果msg有长度限制超长会被裁剪不允许返回 null