refactor: use DesignateServer to get worker info list

This commit is contained in:
tjq 2021-02-19 23:13:14 +08:00
parent 2e488b5837
commit e24f20f5ba
10 changed files with 107 additions and 129 deletions

View File

@ -1,21 +1,14 @@
package com.github.kfcfans.powerjob.server.remote.server;
import akka.actor.AbstractActor;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.remote.server.request.Ping;
import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.remote.server.election.Ping;
import com.github.kfcfans.powerjob.server.remote.server.redirector.RemoteProcessReq;
import com.github.kfcfans.powerjob.server.remote.server.redirector.RemoteRequestProcessor;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.Map;
/**
* 处理朋友们的信息处理服务器与服务器之间的通讯
@ -30,7 +23,6 @@ public class FriendRequestHandler extends AbstractActor {
return receiveBuilder()
.match(Ping.class, this::onReceivePing)
.match(RemoteProcessReq.class, this::onReceiveRemoteProcessReq)
.match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq)
.matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
.build();
}
@ -42,43 +34,12 @@ public class FriendRequestHandler extends AbstractActor {
getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf());
}
/**
* 处理查询Worker节点的请求
*/
private void onReceiveFriendQueryWorkerClusterStatusReq(FriendQueryWorkerClusterStatusReq req) {
Map<String, WorkerInfo> workerInfo = WorkerClusterManagerService.getActiveWorkerInfo(req.getAppId());
AskResponse askResponse = AskResponse.succeed(workerInfo);
getSender().tell(askResponse, getSelf());
}
private void onReceiveRemoteProcessReq(RemoteProcessReq req) {
AskResponse response = new AskResponse();
response.setSuccess(true);
try {
Object[] args = req.getArgs();
String[] parameterTypes = req.getParameterTypes();
Class<?>[] parameters = new Class[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i++) {
parameters[i] = Class.forName(parameterTypes[i]);
Object arg = args[i];
if (arg != null) {
args[i] = JSONObject.parseObject(JSONObject.toJSONBytes(arg), parameters[i]);
}
}
Class<?> clz = Class.forName(req.getClassName());
Object bean = SpringUtils.getBean(clz);
Method method = ReflectionUtils.findMethod(clz, req.getMethodName(), parameters);
assert method != null;
Object invokeResult = ReflectionUtils.invokeMethod(method, bean, args);
response.setData(JSONObject.toJSONBytes(invokeResult));
response.setData(JsonUtils.toBytes(RemoteRequestProcessor.processRemoteRequest(req)));
} catch (Throwable t) {
log.error("[FriendActor] process remote request[{}] failed!", req, t);
response.setSuccess(false);

View File

@ -9,7 +9,6 @@ import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.extension.ServerElectionService;
import com.github.kfcfans.powerjob.server.remote.server.request.Ping;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.server.remote.server.request;
package com.github.kfcfans.powerjob.server.remote.server.election;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.Data;

View File

@ -1,14 +1,15 @@
package com.github.kfcfans.powerjob.server.remote.server.redirector;
import akka.pattern.Patterns;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq;
import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
@ -19,6 +20,9 @@ import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
@ -38,6 +42,8 @@ public class DesignateServerAspect {
@Resource
private AppInfoRepository appInfoRepository;
private static final ObjectMapper objectMapper = new ObjectMapper();
@Around(value = "@annotation(designateServer))")
public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable {
@ -88,6 +94,32 @@ public class DesignateServerAspect {
if (!askResponse.isSuccess()) {
throw new PowerJobException("remote process failed: " + askResponse.getMessage());
}
return JSONObject.parseObject(askResponse.getData(), methodSignature.getReturnType());
// 考虑范型情况
Method method = methodSignature.getMethod();
JavaType returnType = getMethodReturnJavaType(method);
return objectMapper.readValue(askResponse.getData(), returnType);
}
private static JavaType getMethodReturnJavaType(Method method) {
Type type = method.getGenericReturnType();
return getJavaType(type);
}
private static JavaType getJavaType(Type type) {
if (type instanceof ParameterizedType) {
Type[] actualTypeArguments = ((ParameterizedType)type).getActualTypeArguments();
Class<?> rowClass = (Class<?>) ((ParameterizedType)type).getRawType();
JavaType[] javaTypes = new JavaType[actualTypeArguments.length];
for (int i = 0; i < actualTypeArguments.length; i++) {
//泛型也可能带有泛型递归处理
javaTypes[i] = getJavaType(actualTypeArguments[i]);
}
return TypeFactory.defaultInstance().constructParametricType(rowClass, javaTypes);
} else {
return TypeFactory.defaultInstance().constructParametricType((Class<?>) type, new JavaType[0]);
}
}
}

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.server.remote.server.request;
package com.github.kfcfans.powerjob.server.remote.server.redirector;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.Getter;

View File

@ -0,0 +1,38 @@
package com.github.kfcfans.powerjob.server.remote.server.redirector;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
/**
* process remote request
*
* @author tjq
* @since 2021/2/19
*/
public class RemoteRequestProcessor {
public static Object processRemoteRequest(RemoteProcessReq req) throws ClassNotFoundException {
Object[] args = req.getArgs();
String[] parameterTypes = req.getParameterTypes();
Class<?>[] parameters = new Class[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i++) {
parameters[i] = Class.forName(parameterTypes[i]);
Object arg = args[i];
if (arg != null) {
args[i] = JSONObject.parseObject(JSONObject.toJSONBytes(arg), parameters[i]);
}
}
Class<?> clz = Class.forName(req.getClassName());
Object bean = SpringUtils.getBean(clz);
Method method = ReflectionUtils.findMethod(clz, req.getMethodName(), parameters);
assert method != null;
return ReflectionUtils.invokeMethod(method, bean, args);
}
}

View File

@ -1,20 +0,0 @@
package com.github.kfcfans.powerjob.server.remote.server.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 查询 Worker 集群状态
*
* @author tjq
* @since 2020/4/14
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FriendQueryWorkerClusterStatusReq implements OmsSerializable {
private Long appId;
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.remote.worker.cluster;
import com.github.kfcfans.powerjob.server.extension.WorkerFilter;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -44,6 +45,13 @@ public class WorkerClusterQueryService {
return workers;
}
@DesignateServer(appIdParameterName = "appId")
public List<WorkerInfo> getAllWorkers(Long appId) {
List<WorkerInfo> workers = WorkerClusterManagerService.getWorkerInfosByAppId(appId);
workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
return workers;
}
private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) {
for (WorkerFilter filter : workerFilters) {
if (filter.filter(workerInfo, jobInfo)) {

View File

@ -1,37 +1,27 @@
package com.github.kfcfans.powerjob.server.web.controller;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter;
import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.server.web.response.SystemOverviewVO;
import com.github.kfcfans.powerjob.server.web.response.WorkerStatusVO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.stream.Collectors;
/**
* 系统信息控制器服务于前端首页
@ -44,53 +34,19 @@ import java.util.concurrent.TimeUnit;
@RequestMapping("/system")
public class SystemInfoController {
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
@GetMapping("/listWorker")
@SuppressWarnings({"unchecked", "rawtypes"})
public ResultDTO<List<WorkerStatusVO>> listWorker(Long appId) {
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
return ResultDTO.failed("unknown appId of " +appId);
}
String server =appInfoOpt.get().getCurrentServer();
// 没有 Server说明从来没有该 appId worker 集群连接过
if (StringUtils.isEmpty(server)) {
return ResultDTO.success(Collections.emptyList());
}
// 重定向到指定 Server 获取集群信息
FriendQueryWorkerClusterStatusReq req = new FriendQueryWorkerClusterStatusReq(appId);
try {
ActorSelection friendActor = AkkaStarter.getFriendActor(server);
CompletionStage<Object> askCS = Patterns.ask(friendActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (askResponse.isSuccess()) {
Map address2Info = askResponse.getData(Map.class);
List<WorkerStatusVO> result = Lists.newLinkedList();
address2Info.forEach((address, m) -> {
try {
WorkerInfo metrics = JsonUtils.parseObject(JsonUtils.toJSONString(m), WorkerInfo.class);
WorkerStatusVO info = new WorkerStatusVO(String.valueOf(address), metrics);
result.add(info);
}catch (Exception e) {
log.error("[SystemInfoController] parse ask response failed!", e);
}
});
return ResultDTO.success(result);
}
return ResultDTO.failed(askResponse.getMessage());
}catch (Exception e) {
log.error("[SystemInfoController] listWorker for appId:{} failed, exception is {}", appId, e.toString());
return ResultDTO.failed("no worker or server available");
}
List<WorkerInfo> workerInfos = workerClusterQueryService.getAllWorkers(appId);
return ResultDTO.success(workerInfos.stream().map(WorkerStatusVO::new).collect(Collectors.toList()));
}
@GetMapping("/overview")

View File

@ -24,7 +24,7 @@ public class WorkerStatusVO {
private String protocol;
// 1 -> 健康绿色2 -> 一般橙色3 -> 糟糕红色
// 1 -> 健康绿色2 -> 一般橙色3 -> 糟糕红色9999 -> 非在线机器
private int status;
// 12.3%(4 cores)
@ -35,14 +35,14 @@ public class WorkerStatusVO {
private static final double THRESHOLD = 0.8;
public WorkerStatusVO(String address, WorkerInfo workerInfo) {
public WorkerStatusVO(WorkerInfo workerInfo) {
SystemMetrics systemMetrics = workerInfo.getSystemMetrics();
this.protocol = workerInfo.getProtocol();
this.status = 1;
this.address = address;
this.address = workerInfo.getAddress();
this.cpuLoad = String.format(CPU_FORMAT, df.format(systemMetrics.getCpuLoad()), systemMetrics.getCpuProcessors());
if (systemMetrics.getCpuLoad() > systemMetrics.getCpuProcessors() * THRESHOLD) {
this.status ++;
@ -63,5 +63,9 @@ public class WorkerStatusVO {
if (systemMetrics.getDiskUsage() > THRESHOLD) {
this.status ++;
}
if (workerInfo.timeout()) {
this.status = 9999;
}
}
}