develop the server discovery

This commit is contained in:
tjq 2020-04-06 19:41:27 +08:00
parent f1b3edea62
commit a49d2e7e3a
19 changed files with 207 additions and 74 deletions

View File

@ -15,6 +15,7 @@
<properties> <properties>
<slf4j.version>1.7.30</slf4j.version> <slf4j.version>1.7.30</slf4j.version>
<commons.lang.version>3.10</commons.lang.version>
</properties> </properties>
<dependencies> <dependencies>
@ -24,6 +25,13 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version> <version>${slf4j.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,6 +1,5 @@
package com.github.kfcfans.common.model; package com.github.kfcfans.common.model;
import com.sun.istack.internal.NotNull;
import lombok.Data; import lombok.Data;
import java.io.Serializable; import java.io.Serializable;

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.oms.server.web; package com.github.kfcfans.common.response;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;

View File

@ -20,7 +20,6 @@
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version> <oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<hikaricp.version>3.4.2</hikaricp.version> <hikaricp.version>3.4.2</hikaricp.version>
<mysql.version>8.0.19</mysql.version> <mysql.version>8.0.19</mysql.version>
<commons.lang.version>3.10</commons.lang.version>
<curator.version>4.3.0</curator.version> <curator.version>4.3.0</curator.version>
</properties> </properties>
@ -61,15 +60,6 @@
<version>${curator.version}</version> <version>${curator.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang.version}</version>
</dependency>
<!-- SpringBoot --> <!-- SpringBoot -->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@ -65,8 +65,8 @@ public class HashedWheelTimer implements Timer {
taskProcessPool = null; taskProcessPool = null;
}else { }else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build(); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(1024); BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(16);
taskProcessPool = new ThreadPoolExecutor(processTaskNum, processTaskNum, taskProcessPool = new ThreadPoolExecutor(2, processTaskNum,
60, TimeUnit.SECONDS, 60, TimeUnit.SECONDS,
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
} }

View File

@ -41,7 +41,7 @@ public class OhMyServer {
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME); Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_NAME, akkaFinalConfig); actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME); actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);

View File

@ -8,6 +8,7 @@ import com.github.kfcfans.oms.server.core.akka.Ping;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.service.lock.LockService; import com.github.kfcfans.oms.server.service.lock.LockService;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -16,6 +17,7 @@ import javax.annotation.Resource;
import java.time.Duration; import java.time.Duration;
import java.util.Date; import java.util.Date;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -35,7 +37,7 @@ public class ServerSelectService {
private AppInfoRepository appInfoRepository; private AppInfoRepository appInfoRepository;
private static final int RETRY_TIMES = 10; private static final int RETRY_TIMES = 10;
private static final long PING_TIMEOUT_MS = 5000; private static final long PING_TIMEOUT_MS = 1000;
private static final String SERVER_ELECT_LOCK = "server_elect_%d"; private static final String SERVER_ELECT_LOCK = "server_elect_%d";
/** /**
@ -46,6 +48,8 @@ public class ServerSelectService {
*/ */
public String getServer(Long appId) { public String getServer(Long appId) {
Set<String> downServerCache = Sets.newHashSet();
for (int i = 0; i < RETRY_TIMES; i++) { for (int i = 0; i < RETRY_TIMES; i++) {
// 无锁获取当前数据库中的Server // 无锁获取当前数据库中的Server
@ -55,7 +59,7 @@ public class ServerSelectService {
} }
String appName = appInfoOpt.get().getAppName(); String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer(); String originServer = appInfoOpt.get().getCurrentServer();
if (isActive(originServer)) { if (isActive(originServer, downServerCache)) {
return originServer; return originServer;
} }
@ -64,7 +68,7 @@ public class ServerSelectService {
boolean lockStatus = lockService.lock(lockName); boolean lockStatus = lockService.lock(lockName);
if (!lockStatus) { if (!lockStatus) {
try { try {
Thread.sleep(1000); Thread.sleep(500);
}catch (Exception ignore) { }catch (Exception ignore) {
} }
continue; continue;
@ -73,7 +77,7 @@ public class ServerSelectService {
// 可能上一台机器已经完成了Server选举需要再次判断 // 可能上一台机器已经完成了Server选举需要再次判断
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database.")); AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
if (isActive(appInfo.getCurrentServer())) { if (isActive(appInfo.getCurrentServer(), downServerCache)) {
return appInfo.getCurrentServer(); return appInfo.getCurrentServer();
} }
@ -92,10 +96,21 @@ public class ServerSelectService {
throw new RuntimeException("server elect failed for app " + appId); throw new RuntimeException("server elect failed for app " + appId);
} }
private boolean isActive(String serverAddress) { /**
* 判断指定server是否存活
* @param serverAddress 需要检测的server地址
* @param downServerCache 缓存防止多次发送PING这个QPS其实还蛮爆表的...
* @return true -> 存活 / false -> down机
*/
private boolean isActive(String serverAddress, Set<String> downServerCache) {
if (downServerCache.contains(serverAddress)) {
return false;
}
if (StringUtils.isEmpty(serverAddress)) { if (StringUtils.isEmpty(serverAddress)) {
return false; return false;
} }
Ping ping = new Ping(); Ping ping = new Ping();
ping.setCurrentTime(System.currentTimeMillis()); ping.setCurrentTime(System.currentTimeMillis());
@ -103,10 +118,12 @@ public class ServerSelectService {
try { try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
downServerCache.remove(serverAddress);
return response.isSuccess(); return response.isSuccess();
}catch (Exception e) { }catch (Exception e) {
log.warn("[ServerSelectService] server({}) was down, try to elect a new server.", serverAddress); log.warn("[ServerSelectService] server({}) was down, try to elect a new server.", serverAddress);
} }
downServerCache.add(serverAddress);
return false; return false;
} }
} }

View File

@ -10,7 +10,7 @@ import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer;
*/ */
public class HashedWheelTimerHolder { public class HashedWheelTimerHolder {
public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, 4); public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
private HashedWheelTimerHolder() { private HashedWheelTimerHolder() {
} }

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.web; package com.github.kfcfans.oms.server.web;
import com.github.kfcfans.common.response.ResultDTO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ExceptionHandler;

View File

@ -2,7 +2,7 @@ package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.web.ResultDTO; import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.web.request.ModifyAppInfoRequest; import com.github.kfcfans.oms.server.web.request.ModifyAppInfoRequest;
import lombok.Data; import lombok.Data;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;

View File

@ -5,7 +5,7 @@ import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType; import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.web.ResultDTO; import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest; import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;

View File

@ -1,9 +1,10 @@
package com.github.kfcfans.oms.server.web.controller; package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.service.ha.ServerSelectService; import com.github.kfcfans.oms.server.service.ha.ServerSelectService;
import com.github.kfcfans.oms.server.web.ResultDTO; import com.github.kfcfans.common.response.ResultDTO;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@ -35,7 +36,12 @@ public class ServerController {
} }
@GetMapping("/acquire") @GetMapping("/acquire")
public ResultDTO<String> acquireServer(Long appId) { public ResultDTO<String> acquireServer(Long appId, String currentServer) {
// 如果是本机就不需要查数据库那么复杂的操作了直接返回成功
if (OhMyServer.getActorSystemAddress().equals(currentServer)) {
return ResultDTO.success(currentServer);
}
String server = serverSelectService.getServer(appId); String server = serverSelectService.getServer(appId);
return ResultDTO.success(server); return ResultDTO.success(server);
} }

View File

@ -22,6 +22,8 @@
<guava.version>28.2-jre</guava.version> <guava.version>28.2-jre</guava.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>
<kryo.version>5.0.0-RC5</kryo.version> <kryo.version>5.0.0-RC5</kryo.version>
<fastjson.version>1.2.68</fastjson.version>
<okhttp.version>4.4.1</okhttp.version>
</properties> </properties>
<dependencies> <dependencies>
@ -83,6 +85,20 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- OKHttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<!-- fastJson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- 开发阶段输出日志 --> <!-- 开发阶段输出日志 -->
<dependency> <dependency>

View File

@ -2,12 +2,17 @@ package com.github.kfcfans.oms.worker;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.oms.worker.actors.TaskTrackerActor; import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
import com.github.kfcfans.oms.worker.background.ServerDiscoveryService;
import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable; import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable;
import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.utils.NetUtils; import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.common.utils.HttpUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
@ -24,6 +29,7 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -60,13 +66,16 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
init(); init();
} }
public void init() { public void init() throws Exception {
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[OhMyWorker] start to initialize OhMyWorker..."); log.info("[OhMyWorker] start to initialize OhMyWorker...");
try { try {
// 校验 appName
appId = assertAppName();
// 初始化 ActorSystem // 初始化 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap(); Map<String, Object> overrideConfig = Maps.newHashMap();
String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP(); String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP();
@ -88,19 +97,56 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
TaskPersistenceService.INSTANCE.init(); TaskPersistenceService.INSTANCE.init();
log.info("[OhMyWorker] local storage initialized successfully."); log.info("[OhMyWorker] local storage initialized successfully.");
// 服务发现
currentServer = ServerDiscoveryService.discovery();
if (StringUtils.isEmpty(currentServer)) {
throw new RuntimeException("can't find any available server, this worker has been quarantined.");
}
log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer);
// 初始化定时任务 // 初始化定时任务
ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build(); ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build();
timingPool = Executors.newScheduledThreadPool(2, timingPoolFactory); timingPool = Executors.newScheduledThreadPool(2, timingPoolFactory);
timingPool.scheduleAtFixedRate(new WorkerHealthReportRunnable(), 0, 30, TimeUnit.SECONDS); timingPool.scheduleAtFixedRate(new WorkerHealthReportRunnable(), 0, 15, TimeUnit.SECONDS);
timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS);
log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch); log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch);
}catch (Exception e) { }catch (Exception e) {
log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e); log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e);
throw e;
} }
} }
public void setConfig(OhMyConfig config) { public void setConfig(OhMyConfig config) {
OhMyWorker.config = config; OhMyWorker.config = config;
} }
@SuppressWarnings("rawtypes")
private Long assertAppName() {
String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!");
String url = "http://%s/server/assert?appName=%s";
for (String server : config.getServerAddress()) {
String realUrl = String.format(url, server, appName);
try {
String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
ResultDTO resultDTO = JSONObject.parseObject(resultDTOStr, ResultDTO.class);
if (resultDTO.isSuccess()) {
Long appId = Long.valueOf(resultDTO.getData().toString());
log.info("[OhMyWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
return appId;
}else {
log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new IllegalArgumentException("appName invalid!");
}
}catch (IllegalArgumentException ie) {
throw ie;
}catch (Exception ignore) {
}
}
log.error("[OhMyWorker] no available server in {}.", config.getServerAddress());
throw new RuntimeException("no server available!");
}
} }

View File

@ -0,0 +1,48 @@
package com.github.kfcfans.oms.worker.background;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.HttpUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
/**
* 服务发现
*
* @author tjq
* @since 2020/4/6
*/
@Slf4j
public class ServerDiscoveryService {
private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d&currentServer=%s";
@SuppressWarnings("rawtypes")
public static String discovery() {
String result = null;
for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) {
String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer());
try {
result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
}catch (Exception ignore) {
}
if (!StringUtils.isEmpty(result)) {
ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
String server = resultDTO.getData().toString();
log.debug("[OMS-ServerDiscoveryService] current server is {}.", server);
return server;
}
}
}
log.error("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined.");
return null;
}
}

View File

@ -1,43 +0,0 @@
package com.github.kfcfans.oms.worker.background;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* 服务发现
*
* @author tjq
* @since 2020/3/25
*/
public class ServerSelectionService {
private static String appName;
private static List<String> allServerAddress;
private static String defaultServerAddress;
/**
* 获取默认服务器同一个 appName 一定对应同一台服务器
*/
private static String getDefaultServer() {
if (!StringUtils.isEmpty(defaultServerAddress)) {
return defaultServerAddress;
}
Long index = letter2Num(appName);
defaultServerAddress = allServerAddress.get(index.intValue() % allServerAddress.size());
return defaultServerAddress;
}
private static Long letter2Num(String str) {
if (StringUtils.isEmpty(str)) {
return 0L;
}
AtomicLong res = new AtomicLong(0);
str.chars().forEach(res::addAndGet);
return res.get();
}
}

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.common;
import lombok.Data; import lombok.Data;
import java.util.List;
/** /**
* Worker 配置文件 * Worker 配置文件
* *
@ -15,9 +17,9 @@ public class OhMyConfig {
*/ */
private String appName; private String appName;
/** /**
* 调度服务器地址ip:port 多值使用 , 分隔 * 调度服务器地址ip:port
*/ */
private String serverAddress; private List<String> serverAddress;
/** /**
* 通讯端口 * 通讯端口
*/ */

View File

@ -0,0 +1,42 @@
package com.github.kfcfans.oms.worker.common.utils;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 封装 OkHttpClient
*
* @author tjq
* @since 2020/4/6
*/
public class HttpUtils {
private static OkHttpClient client;
private static final int HTTP_SUCCESS_CODE = 200;
static {
client = new OkHttpClient.Builder()
.connectTimeout(1, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.build();
}
public static String get(String url) throws IOException {
Request request = new Request.Builder()
.get()
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {
if (response.code() == HTTP_SUCCESS_CODE) {
return Objects.requireNonNull(response.body()).string();
}
}
return null;
}
}

View File

@ -26,10 +26,11 @@ public class TaskTrackerTest {
private static ActorSelection remoteTaskTracker; private static ActorSelection remoteTaskTracker;
@BeforeAll @BeforeAll
public static void init() { public static void init() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig(); OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test"); ohMyConfig.setAppName("oms-test");
ohMyConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
OhMyWorker worker = new OhMyWorker(); OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig); worker.setConfig(ohMyConfig);
worker.init(); worker.init();