From 5586d48f93b0eafc0dfa430bf593314ae1fdad61 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 9 Feb 2021 23:43:54 +0800 Subject: [PATCH] fix: always use ActorSystem's address as AppInfo's currentServer to avoid some problem #209 --- .../server/common/config/ThreadPoolConfig.java | 2 +- .../handler/outer/WorkerRequestHttpHandler.java | 7 +++++++ .../service/ha/DefaultServerElectionService.java | 15 ++++++++------- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index 265b372f..4289f66f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -34,7 +34,7 @@ public class ThreadPoolConfig { executor.setQueueCapacity(0); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("omsTimingPool-"); - executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJob")); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTiming")); return executor; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java index fc480f5b..d42903f1 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/handler/outer/WorkerRequestHttpHandler.java @@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatu import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils; import io.vertx.core.AbstractVerticle; @@ -43,6 +44,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { .handler(ctx -> { WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class); getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat); + success(ctx); }); router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT) .blockingHandler(ctx -> { @@ -54,6 +56,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { .blockingHandler(ctx -> { WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class); getWorkerRequestHandler().onReceiveWorkerLogReportReq(req); + success(ctx); }); server.requestHandler(router).listen(port); } @@ -63,4 +66,8 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { .putHeader("Content-Type", OmsConstant.JSON_MEDIA_TYPE) .end(JsonObject.mapFrom(msg).encode()); } + + private static void success(RoutingContext ctx) { + out(ctx, ResultDTO.success(null)); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/DefaultServerElectionService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/DefaultServerElectionService.java index 954e805b..054ddba9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/DefaultServerElectionService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/DefaultServerElectionService.java @@ -55,7 +55,7 @@ public class DefaultServerElectionService implements ServerElectionService { public String elect(Long appId, String protocol, String currentServer) { if (!accurate()) { // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 - if (getThisServerAddress(protocol).equals(currentServer)) { + if (getProtocolServerAddress(protocol).equals(currentServer)) { return currentServer; } } @@ -98,14 +98,15 @@ public class DefaultServerElectionService implements ServerElectionService { } // 篡位,本机作为Server - appInfo.setCurrentServer(getThisServerAddress(protocol)); + // 注意,写入 AppInfoDO#currentServer 的永远是 ActorSystem 的地址,仅在返回的时候特殊处理 + appInfo.setCurrentServer(transportService.getTransporter(Protocol.AKKA).getAddress()); appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); - log.info("[ServerSelectService] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); - return appInfo.getCurrentServer(); + log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); + return getProtocolServerAddress(protocol); }catch (Exception e) { - log.warn("[ServerSelectService] write new server to db failed for app {}.", appName); + log.warn("[ServerElection] write new server to db failed for app {}.", appName); }finally { lockService.unlock(lockName); } @@ -138,7 +139,7 @@ public class DefaultServerElectionService implements ServerElectionService { downServerCache.remove(serverAddress); return response.isSuccess(); }catch (Exception e) { - log.warn("[ServerSelectService] server({}) was down.", serverAddress); + log.warn("[ServerElection] server({}) was down.", serverAddress); } downServerCache.add(serverAddress); return false; @@ -148,7 +149,7 @@ public class DefaultServerElectionService implements ServerElectionService { return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage; } - private String getThisServerAddress(String protocol) { + private String getProtocolServerAddress(String protocol) { Protocol pt = Protocol.of(protocol); return transportService.getTransporter(pt).getAddress(); }