diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java
index 474dd3a2..9d88e8a5 100644
--- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/response/AskResponse.java
@@ -17,4 +17,5 @@ import java.io.Serializable;
@AllArgsConstructor
public class AskResponse implements Serializable {
private boolean success;
+ private Object extra;
}
diff --git a/oh-my-scheduler-server/pom.xml b/oh-my-scheduler-server/pom.xml
index 8bcbb04a..e25770ac 100644
--- a/oh-my-scheduler-server/pom.xml
+++ b/oh-my-scheduler-server/pom.xml
@@ -21,6 +21,7 @@
3.4.2
8.0.19
3.10
+ 4.3.0
@@ -53,6 +54,14 @@
${akka.version}
+
+
+ org.apache.curator
+ curator-recipes
+ ${curator.version}
+
+
+
org.apache.commons
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/CuratorConfig.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/CuratorConfig.java
new file mode 100644
index 00000000..c1e09a4a
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/config/CuratorConfig.java
@@ -0,0 +1,36 @@
+package com.github.kfcfans.oms.server.common.config;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * ZooKeeper 连接配置
+ *
+ * @author tjq
+ * @since 2020/4/4
+ */
+@Configuration
+public class CuratorConfig {
+
+ @Value("${zookeeper.address}")
+ private String zkAddress;
+
+ @Bean("omsCurator")
+ public CuratorFramework initCurator() {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .namespace("oms")
+ // zookeeper 地址,多值用 , 分割即可
+ .connectString(zkAddress)
+ .sessionTimeoutMs(1000)
+ .connectionTimeoutMs(1000)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .build();
+ client.start();
+ return client;
+ }
+
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/actors/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
similarity index 72%
rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/actors/OhMyServer.java
rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
index ed1bc2aa..5ab545c3 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/actors/OhMyServer.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java
@@ -1,5 +1,7 @@
-package com.github.kfcfans.oms.server.core.actors;
+package com.github.kfcfans.oms.server.core.akka;
+import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.github.kfcfans.common.RemoteConstant;
@@ -7,6 +9,7 @@ import com.github.kfcfans.common.utils.NetUtils;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@@ -21,6 +24,7 @@ import java.util.Map;
public class OhMyServer {
public static ActorSystem actorSystem;
+ @Getter
private static String actorSystemAddress;
public void init() {
@@ -41,4 +45,14 @@ public class OhMyServer {
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);
}
+
+ /**
+ * 获取 ServerActor 的 ActorSelection
+ * @param address IP:port
+ * @return ActorSelection
+ */
+ public static ActorSelection getServerActor(String address) {
+ String path = String.format("akka://%s@%s/user/%s", RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME);
+ return actorSystem.actorSelection(path);
+ }
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/Ping.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/Ping.java
new file mode 100644
index 00000000..ed069f40
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/Ping.java
@@ -0,0 +1,16 @@
+package com.github.kfcfans.oms.server.core.akka;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * 检测目标机器是否存活
+ *
+ * @author tjq
+ * @since 2020/4/5
+ */
+@Data
+public class Ping implements Serializable {
+ private long currentTime;
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/actors/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java
similarity index 54%
rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/actors/ServerActor.java
rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java
index 0ba32790..14528dce 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/actors/ServerActor.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java
@@ -1,7 +1,8 @@
-package com.github.kfcfans.oms.server.core.actors;
+package com.github.kfcfans.oms.server.core.akka;
import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.WorkerHeartbeat;
+import com.github.kfcfans.common.response.AskResponse;
import lombok.extern.slf4j.Slf4j;
/**
@@ -17,13 +18,22 @@ public class ServerActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
+ .match(Ping.class, this::onReceivePing)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
+ /**
+ * 处理存活检测的请求
+ * @param ping 存活检测请求
+ */
+ private void onReceivePing(Ping ping) {
+ AskResponse askResponse = new AskResponse();
+ askResponse.setSuccess(true);
+ askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime());
+ getSender().tell(askResponse, getSelf());
+ }
+
private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
-
-
-
}
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java
index 7f23362d..80c71b16 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/AppInfoDO.java
@@ -23,6 +23,8 @@ public class AppInfoDO {
private String appName;
private String description;
+ private String currentServer;
+
private Date gmtCreate;
private Date gmtModified;
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/OmsLockDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/OmsLockDO.java
index 54ee4e4c..bfb71f2f 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/OmsLockDO.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/OmsLockDO.java
@@ -23,14 +23,14 @@ public class OmsLockDO {
private Long id;
private String lockName;
- private String owner;
+ private String ownerIP;
private Date gmtCreate;
private Date gmtModified;
- public OmsLockDO(String lockName, String owner) {
+ public OmsLockDO(String lockName, String ownerIP) {
this.lockName = lockName;
- this.owner = owner;
+ this.ownerIP = ownerIP;
this.gmtCreate = new Date();
this.gmtModified = this.gmtCreate;
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java
index 55e43696..ac4bc0ae 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/AppInfoRepository.java
@@ -10,4 +10,6 @@ import org.springframework.data.jpa.repository.JpaRepository;
* @since 2020/4/1
*/
public interface AppInfoRepository extends JpaRepository {
+
+ AppInfoDO findByAppName(String appName);
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/OmsLockRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/OmsLockRepository.java
index 36b38262..d2c9a49b 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/OmsLockRepository.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/OmsLockRepository.java
@@ -19,4 +19,6 @@ public interface OmsLockRepository extends JpaRepository {
@Transactional
@Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true)
int deleteByLockName(String lockName);
+
+ OmsLockDO findByLockName(String lockName);
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
new file mode 100644
index 00000000..b3c7c634
--- /dev/null
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java
@@ -0,0 +1,111 @@
+package com.github.kfcfans.oms.server.service.ha;
+
+import akka.actor.ActorSelection;
+import akka.pattern.Patterns;
+import com.github.kfcfans.common.response.AskResponse;
+import com.github.kfcfans.oms.server.core.akka.OhMyServer;
+import com.github.kfcfans.oms.server.core.akka.Ping;
+import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
+import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
+import com.github.kfcfans.oms.server.service.lock.LockService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.time.Duration;
+import java.util.Date;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Worker请求分配Server服务
+ *
+ * @author tjq
+ * @since 2020/4/5
+ */
+@Slf4j
+@Service
+public class ServerSelectService {
+
+ @Resource
+ private LockService lockService;
+ @Resource
+ private AppInfoRepository appInfoRepository;
+
+ private static final int RETRY_TIMES = 10;
+ private static final long PING_TIMEOUT_MS = 5000;
+ private static final long WAIT_LOCK_TIME = 1000;
+ private static final String SERVER_ELECT_LOCK = "server_elect_%s";
+
+ /**
+ * 获取某个应用对应的Server
+ * 缺点:如果server死而复生,可能造成worker集群脑裂,不过感觉影响不是很大 & 概率极低,就不管了
+ * @param appName 应用名称
+ * @return 当前可用的Server
+ */
+ public String getServer(String appName) {
+
+ for (int i = 0; i < RETRY_TIMES; i++) {
+
+ // 无锁获取当前数据库中的Server
+ AppInfoDO app = appInfoRepository.findByAppName(appName);
+ if (app == null) {
+ throw new RuntimeException(appName + " is not registered!");
+ }
+ String originServer = app.getCurrentServer();
+ if (isActive(originServer)) {
+ return originServer;
+ }
+
+ // 获取失败,重新进行Server选举,需要加锁
+ String lockName = String.format(SERVER_ELECT_LOCK, appName);
+ boolean lockStatus = lockService.lock(lockName);
+ if (!lockStatus) {
+ try {
+ Thread.sleep(1000);
+ }catch (Exception ignore) {
+ }
+ continue;
+ }
+ try {
+
+ // 可能上一台机器已经完成了Server选举,需要再次判断
+ AppInfoDO appInfo = appInfoRepository.findByAppName(appName);
+ if (isActive(appInfo.getCurrentServer())) {
+ return appInfo.getCurrentServer();
+ }
+
+ // 篡位,本机作为Server
+ appInfo.setCurrentServer(OhMyServer.getActorSystemAddress());
+ appInfo.setGmtModified(new Date());
+
+ appInfoRepository.saveAndFlush(appInfo);
+ return appInfo.getCurrentServer();
+ }catch (Exception e) {
+ log.warn("[ServerSelectService] write new server to db failed for app {}.", appName);
+ }finally {
+ lockService.unlock(lockName);
+ }
+ }
+ throw new RuntimeException("server elect failed for app " + appName);
+ }
+
+ private boolean isActive(String serverAddress) {
+ if (StringUtils.isEmpty(serverAddress)) {
+ return false;
+ }
+ Ping ping = new Ping();
+ ping.setCurrentTime(System.currentTimeMillis());
+
+ ActorSelection serverActor = OhMyServer.getServerActor(serverAddress);
+ try {
+ CompletionStage