From e24f20f5ba9b9acaa6123c4413621653ef91e315 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 19 Feb 2021 23:13:14 +0800 Subject: [PATCH] refactor: use DesignateServer to get worker info list --- .../remote/server/FriendRequestHandler.java | 49 ++------------ .../DefaultServerElectionService.java | 1 - .../server/{request => election}/Ping.java | 2 +- .../redirector/DesignateServerAspect.java | 40 +++++++++-- .../RemoteProcessReq.java | 2 +- .../redirector/RemoteRequestProcessor.java | 38 +++++++++++ .../FriendQueryWorkerClusterStatusReq.java | 20 ------ .../cluster/WorkerClusterQueryService.java | 8 +++ .../web/controller/SystemInfoController.java | 66 ++++--------------- .../server/web/response/WorkerStatusVO.java | 10 ++- 10 files changed, 107 insertions(+), 129 deletions(-) rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/{request => election}/Ping.java (78%) rename powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/{request => redirector}/RemoteProcessReq.java (86%) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/RemoteRequestProcessor.java delete mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/FriendQueryWorkerClusterStatusReq.java diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java index b6d8cef1..bd5529ea 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java @@ -1,21 +1,14 @@ package com.github.kfcfans.powerjob.server.remote.server; import akka.actor.AbstractActor; -import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.response.AskResponse; -import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; -import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq; -import com.github.kfcfans.powerjob.server.remote.server.request.Ping; -import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; +import com.github.kfcfans.powerjob.server.remote.server.election.Ping; +import com.github.kfcfans.powerjob.server.remote.server.redirector.RemoteProcessReq; +import com.github.kfcfans.powerjob.server.remote.server.redirector.RemoteRequestProcessor; import com.github.kfcfans.powerjob.server.remote.transport.TransportService; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.springframework.util.ReflectionUtils; - -import java.lang.reflect.Method; -import java.util.Map; /** * 处理朋友们的信息(处理服务器与服务器之间的通讯) @@ -30,7 +23,6 @@ public class FriendRequestHandler extends AbstractActor { return receiveBuilder() .match(Ping.class, this::onReceivePing) .match(RemoteProcessReq.class, this::onReceiveRemoteProcessReq) - .match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq) .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj)) .build(); } @@ -42,43 +34,12 @@ public class FriendRequestHandler extends AbstractActor { getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf()); } - /** - * 处理查询Worker节点的请求 - */ - private void onReceiveFriendQueryWorkerClusterStatusReq(FriendQueryWorkerClusterStatusReq req) { - Map workerInfo = WorkerClusterManagerService.getActiveWorkerInfo(req.getAppId()); - AskResponse askResponse = AskResponse.succeed(workerInfo); - getSender().tell(askResponse, getSelf()); - } - private void onReceiveRemoteProcessReq(RemoteProcessReq req) { AskResponse response = new AskResponse(); response.setSuccess(true); try { - - Object[] args = req.getArgs(); - String[] parameterTypes = req.getParameterTypes(); - Class[] parameters = new Class[parameterTypes.length]; - - for (int i = 0; i < parameterTypes.length; i++) { - parameters[i] = Class.forName(parameterTypes[i]); - Object arg = args[i]; - if (arg != null) { - args[i] = JSONObject.parseObject(JSONObject.toJSONBytes(arg), parameters[i]); - } - } - - Class clz = Class.forName(req.getClassName()); - - Object bean = SpringUtils.getBean(clz); - Method method = ReflectionUtils.findMethod(clz, req.getMethodName(), parameters); - - assert method != null; - Object invokeResult = ReflectionUtils.invokeMethod(method, bean, args); - - response.setData(JSONObject.toJSONBytes(invokeResult)); - + response.setData(JsonUtils.toBytes(RemoteRequestProcessor.processRemoteRequest(req))); } catch (Throwable t) { log.error("[FriendActor] process remote request[{}] failed!", req, t); response.setSuccess(false); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java index 236b24a3..95440b76 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java @@ -9,7 +9,6 @@ import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.extension.ServerElectionService; -import com.github.kfcfans.powerjob.server.remote.server.request.Ping; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.remote.transport.TransportService; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/Ping.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/Ping.java similarity index 78% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/Ping.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/Ping.java index df6d02af..758d215c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/Ping.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/Ping.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.powerjob.server.remote.server.request; +package com.github.kfcfans.powerjob.server.remote.server.election; import com.github.kfcfans.powerjob.common.OmsSerializable; import lombok.Data; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/DesignateServerAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/DesignateServerAspect.java index d4d789f3..ab62d6ae 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/DesignateServerAspect.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/DesignateServerAspect.java @@ -1,14 +1,15 @@ package com.github.kfcfans.powerjob.server.remote.server.redirector; import akka.pattern.Patterns; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.TypeFactory; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.AskResponse; -import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq; -import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; +import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; @@ -19,6 +20,9 @@ import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.time.Duration; import java.util.Arrays; import java.util.Objects; @@ -38,6 +42,8 @@ public class DesignateServerAspect { @Resource private AppInfoRepository appInfoRepository; + private static final ObjectMapper objectMapper = new ObjectMapper(); + @Around(value = "@annotation(designateServer))") public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable { @@ -88,6 +94,32 @@ public class DesignateServerAspect { if (!askResponse.isSuccess()) { throw new PowerJobException("remote process failed: " + askResponse.getMessage()); } - return JSONObject.parseObject(askResponse.getData(), methodSignature.getReturnType()); + + // 考虑范型情况 + Method method = methodSignature.getMethod(); + JavaType returnType = getMethodReturnJavaType(method); + + return objectMapper.readValue(askResponse.getData(), returnType); + } + + + private static JavaType getMethodReturnJavaType(Method method) { + Type type = method.getGenericReturnType(); + return getJavaType(type); + } + + private static JavaType getJavaType(Type type) { + if (type instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType)type).getActualTypeArguments(); + Class rowClass = (Class) ((ParameterizedType)type).getRawType(); + JavaType[] javaTypes = new JavaType[actualTypeArguments.length]; + for (int i = 0; i < actualTypeArguments.length; i++) { + //泛型也可能带有泛型,递归处理 + javaTypes[i] = getJavaType(actualTypeArguments[i]); + } + return TypeFactory.defaultInstance().constructParametricType(rowClass, javaTypes); + } else { + return TypeFactory.defaultInstance().constructParametricType((Class) type, new JavaType[0]); + } } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/RemoteProcessReq.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/RemoteProcessReq.java similarity index 86% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/RemoteProcessReq.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/RemoteProcessReq.java index 022dff34..8fcabae7 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/RemoteProcessReq.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/RemoteProcessReq.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.powerjob.server.remote.server.request; +package com.github.kfcfans.powerjob.server.remote.server.redirector; import com.github.kfcfans.powerjob.common.OmsSerializable; import lombok.Getter; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/RemoteRequestProcessor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/RemoteRequestProcessor.java new file mode 100644 index 00000000..8010b9fb --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/redirector/RemoteRequestProcessor.java @@ -0,0 +1,38 @@ +package com.github.kfcfans.powerjob.server.remote.server.redirector; + +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Method; + +/** + * process remote request + * + * @author tjq + * @since 2021/2/19 + */ +public class RemoteRequestProcessor { + + public static Object processRemoteRequest(RemoteProcessReq req) throws ClassNotFoundException { + Object[] args = req.getArgs(); + String[] parameterTypes = req.getParameterTypes(); + Class[] parameters = new Class[parameterTypes.length]; + + for (int i = 0; i < parameterTypes.length; i++) { + parameters[i] = Class.forName(parameterTypes[i]); + Object arg = args[i]; + if (arg != null) { + args[i] = JSONObject.parseObject(JSONObject.toJSONBytes(arg), parameters[i]); + } + } + + Class clz = Class.forName(req.getClassName()); + + Object bean = SpringUtils.getBean(clz); + Method method = ReflectionUtils.findMethod(clz, req.getMethodName(), parameters); + + assert method != null; + return ReflectionUtils.invokeMethod(method, bean, args); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/FriendQueryWorkerClusterStatusReq.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/FriendQueryWorkerClusterStatusReq.java deleted file mode 100644 index eb77d230..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/request/FriendQueryWorkerClusterStatusReq.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.github.kfcfans.powerjob.server.remote.server.request; - -import com.github.kfcfans.powerjob.common.OmsSerializable; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -/** - * 查询 Worker 集群状态 - * - * @author tjq - * @since 2020/4/14 - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -public class FriendQueryWorkerClusterStatusReq implements OmsSerializable { - private Long appId; -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java index 8b9ad82f..1986d2f3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.remote.worker.cluster; import com.github.kfcfans.powerjob.server.extension.WorkerFilter; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -44,6 +45,13 @@ public class WorkerClusterQueryService { return workers; } + @DesignateServer(appIdParameterName = "appId") + public List getAllWorkers(Long appId) { + List workers = WorkerClusterManagerService.getWorkerInfosByAppId(appId); + workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); + return workers; + } + private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) { for (WorkerFilter filter : workerFilters) { if (filter.filter(workerInfo, jobInfo)) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java index f11cbffb..f8d6af89 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java @@ -1,37 +1,27 @@ package com.github.kfcfans.powerjob.server.web.controller; -import akka.actor.ActorSelection; -import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.OmsConstant; -import com.github.kfcfans.powerjob.common.RemoteConstant; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; -import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.ResultDTO; -import com.github.kfcfans.powerjob.common.utils.JsonUtils; -import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter; -import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; -import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; -import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.server.web.response.SystemOverviewVO; import com.github.kfcfans.powerjob.server.web.response.WorkerStatusVO; -import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateUtils; -import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; +import java.util.stream.Collectors; /** * 系统信息控制器(服务于前端首页) @@ -44,53 +34,19 @@ import java.util.concurrent.TimeUnit; @RequestMapping("/system") public class SystemInfoController { - @Resource - private AppInfoRepository appInfoRepository; @Resource private JobInfoRepository jobInfoRepository; @Resource private InstanceInfoRepository instanceInfoRepository; + @Resource + private WorkerClusterQueryService workerClusterQueryService; + @GetMapping("/listWorker") - @SuppressWarnings({"unchecked", "rawtypes"}) public ResultDTO> listWorker(Long appId) { - Optional appInfoOpt = appInfoRepository.findById(appId); - if (!appInfoOpt.isPresent()) { - return ResultDTO.failed("unknown appId of " +appId); - } - String server =appInfoOpt.get().getCurrentServer(); - // 没有 Server,说明从来没有该 appId 的 worker 集群连接过 - if (StringUtils.isEmpty(server)) { - return ResultDTO.success(Collections.emptyList()); - } - - // 重定向到指定 Server 获取集群信息 - FriendQueryWorkerClusterStatusReq req = new FriendQueryWorkerClusterStatusReq(appId); - try { - ActorSelection friendActor = AkkaStarter.getFriendActor(server); - CompletionStage askCS = Patterns.ask(friendActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - - if (askResponse.isSuccess()) { - Map address2Info = askResponse.getData(Map.class); - List result = Lists.newLinkedList(); - address2Info.forEach((address, m) -> { - try { - WorkerInfo metrics = JsonUtils.parseObject(JsonUtils.toJSONString(m), WorkerInfo.class); - WorkerStatusVO info = new WorkerStatusVO(String.valueOf(address), metrics); - result.add(info); - }catch (Exception e) { - log.error("[SystemInfoController] parse ask response failed!", e); - } - }); - return ResultDTO.success(result); - } - return ResultDTO.failed(askResponse.getMessage()); - }catch (Exception e) { - log.error("[SystemInfoController] listWorker for appId:{} failed, exception is {}", appId, e.toString()); - return ResultDTO.failed("no worker or server available"); - } + List workerInfos = workerClusterQueryService.getAllWorkers(appId); + return ResultDTO.success(workerInfos.stream().map(WorkerStatusVO::new).collect(Collectors.toList())); } @GetMapping("/overview") diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkerStatusVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkerStatusVO.java index a1e75b78..61670325 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkerStatusVO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkerStatusVO.java @@ -24,7 +24,7 @@ public class WorkerStatusVO { private String protocol; - // 1 -> 健康,绿色,2 -> 一般,橙色,3 -> 糟糕,红色 + // 1 -> 健康,绿色,2 -> 一般,橙色,3 -> 糟糕,红色,9999 -> 非在线机器 private int status; // 12.3%(4 cores) @@ -35,14 +35,14 @@ public class WorkerStatusVO { private static final double THRESHOLD = 0.8; - public WorkerStatusVO(String address, WorkerInfo workerInfo) { + public WorkerStatusVO(WorkerInfo workerInfo) { SystemMetrics systemMetrics = workerInfo.getSystemMetrics(); this.protocol = workerInfo.getProtocol(); this.status = 1; - this.address = address; + this.address = workerInfo.getAddress(); this.cpuLoad = String.format(CPU_FORMAT, df.format(systemMetrics.getCpuLoad()), systemMetrics.getCpuProcessors()); if (systemMetrics.getCpuLoad() > systemMetrics.getCpuProcessors() * THRESHOLD) { this.status ++; @@ -63,5 +63,9 @@ public class WorkerStatusVO { if (systemMetrics.getDiskUsage() > THRESHOLD) { this.status ++; } + + if (workerInfo.timeout()) { + this.status = 9999; + } } }