fix: always use ActorSystem's address as AppInfo's currentServer to avoid some problem #209

This commit is contained in:
tjq 2021-02-09 23:43:54 +08:00
parent ee9ed3c099
commit 5586d48f93
3 changed files with 16 additions and 8 deletions

View File

@ -34,7 +34,7 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(0); executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60); executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsTimingPool-"); executor.setThreadNamePrefix("omsTimingPool-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJob")); executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTiming"));
return executor; return executor;
} }

View File

@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatu
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils; import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
@ -43,6 +44,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
.handler(ctx -> { .handler(ctx -> {
WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class); WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);
getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat); getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat);
success(ctx);
}); });
router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT) router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT)
.blockingHandler(ctx -> { .blockingHandler(ctx -> {
@ -54,6 +56,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
.blockingHandler(ctx -> { .blockingHandler(ctx -> {
WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class); WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);
getWorkerRequestHandler().onReceiveWorkerLogReportReq(req); getWorkerRequestHandler().onReceiveWorkerLogReportReq(req);
success(ctx);
}); });
server.requestHandler(router).listen(port); server.requestHandler(router).listen(port);
} }
@ -63,4 +66,8 @@ public class WorkerRequestHttpHandler extends AbstractVerticle {
.putHeader("Content-Type", OmsConstant.JSON_MEDIA_TYPE) .putHeader("Content-Type", OmsConstant.JSON_MEDIA_TYPE)
.end(JsonObject.mapFrom(msg).encode()); .end(JsonObject.mapFrom(msg).encode());
} }
private static void success(RoutingContext ctx) {
out(ctx, ResultDTO.success(null));
}
} }

View File

@ -55,7 +55,7 @@ public class DefaultServerElectionService implements ServerElectionService {
public String elect(Long appId, String protocol, String currentServer) { public String elect(Long appId, String protocol, String currentServer) {
if (!accurate()) { if (!accurate()) {
// 如果是本机就不需要查数据库那么复杂的操作了直接返回成功 // 如果是本机就不需要查数据库那么复杂的操作了直接返回成功
if (getThisServerAddress(protocol).equals(currentServer)) { if (getProtocolServerAddress(protocol).equals(currentServer)) {
return currentServer; return currentServer;
} }
} }
@ -98,14 +98,15 @@ public class DefaultServerElectionService implements ServerElectionService {
} }
// 篡位本机作为Server // 篡位本机作为Server
appInfo.setCurrentServer(getThisServerAddress(protocol)); // 注意写入 AppInfoDO#currentServer 的永远是 ActorSystem 的地址仅在返回的时候特殊处理
appInfo.setCurrentServer(transportService.getTransporter(Protocol.AKKA).getAddress());
appInfo.setGmtModified(new Date()); appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo); appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerSelectService] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return appInfo.getCurrentServer(); return getProtocolServerAddress(protocol);
}catch (Exception e) { }catch (Exception e) {
log.warn("[ServerSelectService] write new server to db failed for app {}.", appName); log.warn("[ServerElection] write new server to db failed for app {}.", appName);
}finally { }finally {
lockService.unlock(lockName); lockService.unlock(lockName);
} }
@ -138,7 +139,7 @@ public class DefaultServerElectionService implements ServerElectionService {
downServerCache.remove(serverAddress); downServerCache.remove(serverAddress);
return response.isSuccess(); return response.isSuccess();
}catch (Exception e) { }catch (Exception e) {
log.warn("[ServerSelectService] server({}) was down.", serverAddress); log.warn("[ServerElection] server({}) was down.", serverAddress);
} }
downServerCache.add(serverAddress); downServerCache.add(serverAddress);
return false; return false;
@ -148,7 +149,7 @@ public class DefaultServerElectionService implements ServerElectionService {
return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage; return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage;
} }
private String getThisServerAddress(String protocol) { private String getProtocolServerAddress(String protocol) {
Protocol pt = Protocol.of(protocol); Protocol pt = Protocol.of(protocol);
return transportService.getTransporter(pt).getAddress(); return transportService.getTransporter(pt).getAddress();
} }