diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml
index 361424dd..03d89bd4 100644
--- a/oh-my-scheduler-common/pom.xml
+++ b/oh-my-scheduler-common/pom.xml
@@ -15,6 +15,7 @@
1.7.30
+ 3.10
@@ -24,6 +25,13 @@
slf4j-api
${slf4j.version}
+
+
+
+ org.apache.commons
+ commons-lang3
+ ${commons.lang.version}
+
\ No newline at end of file
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java
index 8abb9d24..589cf7db 100644
--- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java
@@ -1,6 +1,5 @@
package com.github.kfcfans.common.model;
-import com.sun.istack.internal.NotNull;
import lombok.Data;
import java.io.Serializable;
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ResultDTO.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/ResultDTO.java
similarity index 94%
rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ResultDTO.java
rename to oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/ResultDTO.java
index 4f799c18..6b0bffe5 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/ResultDTO.java
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/ResultDTO.java
@@ -1,4 +1,4 @@
-package com.github.kfcfans.oms.server.web;
+package com.github.kfcfans.common.response;
import lombok.Getter;
import lombok.Setter;
diff --git a/oh-my-scheduler-server/pom.xml b/oh-my-scheduler-server/pom.xml
index e25770ac..e10749f5 100644
--- a/oh-my-scheduler-server/pom.xml
+++ b/oh-my-scheduler-server/pom.xml
@@ -20,7 +20,6 @@
1.0.0-SNAPSHOT
3.4.2
8.0.19
- 3.10
4.3.0
@@ -61,15 +60,6 @@
${curator.version}
-
-
-
- org.apache.commons
- commons-lang3
- ${commons.lang.version}
-
-
-
org.springframework.boot
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/HashedWheelTimer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/HashedWheelTimer.java
index c30988ff..f20c5c50 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/HashedWheelTimer.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/HashedWheelTimer.java
@@ -65,8 +65,8 @@ public class HashedWheelTimer implements Timer {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
- BlockingQueue queue = Queues.newLinkedBlockingQueue(1024);
- taskProcessPool = new ThreadPoolExecutor(processTaskNum, processTaskNum,
+ BlockingQueue queue = Queues.newLinkedBlockingQueue(16);
+ taskProcessPool = new ThreadPoolExecutor(2, processTaskNum,
60, TimeUnit.SECONDS,
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
index 97fd7120..43cdf4cd 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
@@ -41,7 +41,7 @@ public class OhMyServer {
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
- actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_NAME, akkaFinalConfig);
+ actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
index 004baa42..e6debd6f 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
@@ -8,6 +8,7 @@ import com.github.kfcfans.oms.server.core.akka.Ping;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.service.lock.LockService;
+import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@@ -16,6 +17,7 @@ import javax.annotation.Resource;
import java.time.Duration;
import java.util.Date;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@@ -35,7 +37,7 @@ public class ServerSelectService {
private AppInfoRepository appInfoRepository;
private static final int RETRY_TIMES = 10;
- private static final long PING_TIMEOUT_MS = 5000;
+ private static final long PING_TIMEOUT_MS = 1000;
private static final String SERVER_ELECT_LOCK = "server_elect_%d";
/**
@@ -46,6 +48,8 @@ public class ServerSelectService {
*/
public String getServer(Long appId) {
+ Set downServerCache = Sets.newHashSet();
+
for (int i = 0; i < RETRY_TIMES; i++) {
// 无锁获取当前数据库中的Server
@@ -55,7 +59,7 @@ public class ServerSelectService {
}
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();
- if (isActive(originServer)) {
+ if (isActive(originServer, downServerCache)) {
return originServer;
}
@@ -64,7 +68,7 @@ public class ServerSelectService {
boolean lockStatus = lockService.lock(lockName);
if (!lockStatus) {
try {
- Thread.sleep(1000);
+ Thread.sleep(500);
}catch (Exception ignore) {
}
continue;
@@ -73,7 +77,7 @@ public class ServerSelectService {
// 可能上一台机器已经完成了Server选举,需要再次判断
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
- if (isActive(appInfo.getCurrentServer())) {
+ if (isActive(appInfo.getCurrentServer(), downServerCache)) {
return appInfo.getCurrentServer();
}
@@ -92,10 +96,21 @@ public class ServerSelectService {
throw new RuntimeException("server elect failed for app " + appId);
}
- private boolean isActive(String serverAddress) {
+ /**
+ * 判断指定server是否存活
+ * @param serverAddress 需要检测的server地址
+ * @param downServerCache 缓存,防止多次发送PING(这个QPS其实还蛮爆表的...)
+ * @return true -> 存活 / false -> down机
+ */
+ private boolean isActive(String serverAddress, Set downServerCache) {
+
+ if (downServerCache.contains(serverAddress)) {
+ return false;
+ }
if (StringUtils.isEmpty(serverAddress)) {
return false;
}
+
Ping ping = new Ping();
ping.setCurrentTime(System.currentTimeMillis());
@@ -103,10 +118,12 @@ public class ServerSelectService {
try {
CompletionStage
+
+
+ com.squareup.okhttp3
+ okhttp
+ ${okhttp.version}
+
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java
index 4cf9a799..107c8848 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java
@@ -2,12 +2,17 @@ package com.github.kfcfans.oms.worker;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import com.alibaba.fastjson.JSONObject;
+import com.github.kfcfans.common.response.ResultDTO;
+import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
+import com.github.kfcfans.oms.worker.background.ServerDiscoveryService;
import com.github.kfcfans.oms.worker.background.WorkerHealthReportRunnable;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.utils.NetUtils;
+import com.github.kfcfans.oms.worker.common.utils.HttpUtils;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch;
@@ -24,6 +29,7 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.util.StringUtils;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -60,13 +66,16 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
init();
}
- public void init() {
+ public void init() throws Exception {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[OhMyWorker] start to initialize OhMyWorker...");
try {
+ // 校验 appName
+ appId = assertAppName();
+
// 初始化 ActorSystem
Map overrideConfig = Maps.newHashMap();
String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP();
@@ -88,19 +97,56 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
TaskPersistenceService.INSTANCE.init();
log.info("[OhMyWorker] local storage initialized successfully.");
+ // 服务发现
+ currentServer = ServerDiscoveryService.discovery();
+ if (StringUtils.isEmpty(currentServer)) {
+ throw new RuntimeException("can't find any available server, this worker has been quarantined.");
+ }
+ log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer);
+
// 初始化定时任务
ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build();
timingPool = Executors.newScheduledThreadPool(2, timingPoolFactory);
- timingPool.scheduleAtFixedRate(new WorkerHealthReportRunnable(), 0, 30, TimeUnit.SECONDS);
-
+ timingPool.scheduleAtFixedRate(new WorkerHealthReportRunnable(), 0, 15, TimeUnit.SECONDS);
+ timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS);
log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch);
}catch (Exception e) {
log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e);
+ throw e;
}
}
public void setConfig(OhMyConfig config) {
OhMyWorker.config = config;
}
+
+ @SuppressWarnings("rawtypes")
+ private Long assertAppName() {
+
+ String appName = config.getAppName();
+ Objects.requireNonNull(appName, "appName can't be empty!");
+
+ String url = "http://%s/server/assert?appName=%s";
+ for (String server : config.getServerAddress()) {
+ String realUrl = String.format(url, server, appName);
+ try {
+ String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
+ ResultDTO resultDTO = JSONObject.parseObject(resultDTOStr, ResultDTO.class);
+ if (resultDTO.isSuccess()) {
+ Long appId = Long.valueOf(resultDTO.getData().toString());
+ log.info("[OhMyWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
+ return appId;
+ }else {
+ log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
+ throw new IllegalArgumentException("appName invalid!");
+ }
+ }catch (IllegalArgumentException ie) {
+ throw ie;
+ }catch (Exception ignore) {
+ }
+ }
+ log.error("[OhMyWorker] no available server in {}.", config.getServerAddress());
+ throw new RuntimeException("no server available!");
+ }
}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java
new file mode 100644
index 00000000..0618023d
--- /dev/null
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerDiscoveryService.java
@@ -0,0 +1,48 @@
+package com.github.kfcfans.oms.worker.background;
+
+import com.alibaba.fastjson.JSONObject;
+import com.github.kfcfans.common.response.ResultDTO;
+import com.github.kfcfans.common.utils.CommonUtils;
+import com.github.kfcfans.oms.worker.OhMyWorker;
+import com.github.kfcfans.oms.worker.common.utils.HttpUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
+
+/**
+ * 服务发现
+ *
+ * @author tjq
+ * @since 2020/4/6
+ */
+@Slf4j
+public class ServerDiscoveryService {
+
+ private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d¤tServer=%s";
+
+ @SuppressWarnings("rawtypes")
+ public static String discovery() {
+
+ String result = null;
+ for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) {
+
+ String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer());
+ try {
+ result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
+ }catch (Exception ignore) {
+ }
+
+ if (!StringUtils.isEmpty(result)) {
+
+ ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
+ if (resultDTO.isSuccess()) {
+ String server = resultDTO.getData().toString();
+ log.debug("[OMS-ServerDiscoveryService] current server is {}.", server);
+ return server;
+ }
+ }
+ }
+ log.error("[OMS-ServerDiscoveryService] can't find any available server, this worker has been quarantined.");
+ return null;
+ }
+
+}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerSelectionService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerSelectionService.java
deleted file mode 100644
index 212b359b..00000000
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/ServerSelectionService.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.github.kfcfans.oms.worker.background;
-
-import org.springframework.util.StringUtils;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * 服务发现
- *
- * @author tjq
- * @since 2020/3/25
- */
-public class ServerSelectionService {
-
- private static String appName;
- private static List allServerAddress;
- private static String defaultServerAddress;
-
-
- /**
- * 获取默认服务器,同一个 appName 一定对应同一台服务器
- */
- private static String getDefaultServer() {
- if (!StringUtils.isEmpty(defaultServerAddress)) {
- return defaultServerAddress;
- }
-
- Long index = letter2Num(appName);
- defaultServerAddress = allServerAddress.get(index.intValue() % allServerAddress.size());
- return defaultServerAddress;
- }
-
- private static Long letter2Num(String str) {
- if (StringUtils.isEmpty(str)) {
- return 0L;
- }
- AtomicLong res = new AtomicLong(0);
- str.chars().forEach(res::addAndGet);
- return res.get();
- }
-
-}
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java
index 74b87dbb..d66a984f 100644
--- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java
@@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.common;
import lombok.Data;
+import java.util.List;
+
/**
* Worker 配置文件
*
@@ -15,9 +17,9 @@ public class OhMyConfig {
*/
private String appName;
/**
- * 调度服务器地址,ip:port (多值使用 , 分隔)
+ * 调度服务器地址,ip:port
*/
- private String serverAddress;
+ private List serverAddress;
/**
* 通讯端口
*/
diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/HttpUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/HttpUtils.java
new file mode 100644
index 00000000..f92e233a
--- /dev/null
+++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/HttpUtils.java
@@ -0,0 +1,42 @@
+package com.github.kfcfans.oms.worker.common.utils;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 封装 OkHttpClient
+ *
+ * @author tjq
+ * @since 2020/4/6
+ */
+public class HttpUtils {
+
+ private static OkHttpClient client;
+ private static final int HTTP_SUCCESS_CODE = 200;
+
+ static {
+ client = new OkHttpClient.Builder()
+ .connectTimeout(1, TimeUnit.SECONDS)
+ .readTimeout(5, TimeUnit.SECONDS)
+ .build();
+ }
+
+ public static String get(String url) throws IOException {
+ Request request = new Request.Builder()
+ .get()
+ .url(url)
+ .build();
+ try (Response response = client.newCall(request).execute()) {
+ if (response.code() == HTTP_SUCCESS_CODE) {
+ return Objects.requireNonNull(response.body()).string();
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java
index c400733c..cca7d8a2 100644
--- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java
+++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TaskTrackerTest.java
@@ -26,10 +26,11 @@ public class TaskTrackerTest {
private static ActorSelection remoteTaskTracker;
@BeforeAll
- public static void init() {
+ public static void init() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
+ ohMyConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
worker.init();