diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml
index 03d89bd4..32252222 100644
--- a/oh-my-scheduler-common/pom.xml
+++ b/oh-my-scheduler-common/pom.xml
@@ -16,6 +16,7 @@
1.7.30
3.10
+ 28.2-jre
@@ -32,6 +33,13 @@
commons-lang3
${commons.lang.version}
+
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
\ No newline at end of file
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java
index 1bb1f815..be6b9e0c 100644
--- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java
@@ -1,8 +1,11 @@
package com.github.kfcfans.common;
+import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import java.util.List;
+
/**
* 任务运行状态
*
@@ -13,7 +16,7 @@ import lombok.Getter;
@AllArgsConstructor
public enum InstanceStatus {
- WAITING_DISPATCH(1, "等待任务派发,任务处理Server时间轮中"),
+ WAITING_DISPATCH(1, "等待任务派发"),
WAITING_WORKER_RECEIVE(2, "Server已完成任务派发,等待Worker接收"),
RUNNING(3, "Worker接收成功,正在运行任务"),
FAILED(4, "任务运行失败"),
@@ -23,6 +26,9 @@ public enum InstanceStatus {
private int v;
private String des;
+ // 广义的运行状态
+ public static final List generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
+
public static InstanceStatus of(int v) {
for (InstanceStatus is : values()) {
if (v == is.v) {
diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/SystemInstanceResult.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/SystemInstanceResult.java
new file mode 100644
index 00000000..d7d53918
--- /dev/null
+++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/SystemInstanceResult.java
@@ -0,0 +1,29 @@
+package com.github.kfcfans.common;
+
+/**
+ * 系统生成的任务实例运行结果
+ *
+ * @author tjq
+ * @since 2020/4/11
+ */
+public class SystemInstanceResult {
+
+ // 同时运行的任务实例数过多
+ public static final String TOO_MUCH_INSTANCE = "too much instance(%d>%d)";
+ // 无可用worker
+ public static final String NO_WORKER_AVAILABLE = "no worker available";
+ // 任务执行超时
+ public static final String INSTANCE_EXECUTE_TIMEOUT = "instance execute timeout";
+ // 创建根任务失败
+ public static final String TASK_INIT_FAILED = "create root task failed";
+ // 未知错误
+ public static final String UNKNOWN_BUG = "unknown bug";
+ // TaskTracker 长时间未上报
+ public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down";
+
+
+ // 被用户手动停止
+ public static final String STOPPED_BY_USER = "stopped by user";
+
+
+}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java
index 70efa3e2..f8b2b775 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java
@@ -59,7 +59,7 @@ public class OhMyServer {
* @param address IP:port
* @return ActorSelection
*/
- public static ActorSelection getServerActor(String address) {
+ public static ActorSelection getFriendActor(String address) {
String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java
index 5ddc9f4c..620e6767 100644
--- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java
+++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java
@@ -1,25 +1,9 @@
package com.github.kfcfans.oms.server.akka.actors;
import akka.actor.AbstractActor;
-import akka.pattern.Patterns;
-import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
-import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse;
-import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.Ping;
-import com.github.kfcfans.oms.server.akka.requests.RedirectServerQueryInstanceStatusReq;
-import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq;
-import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
-import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.Serializable;
-import java.time.Duration;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-
-import static com.github.kfcfans.common.RemoteConstant.DEFAULT_TIMEOUT_MS;
/**
* 处理朋友们的信息(处理服务器与服务器之间的通讯)
@@ -33,7 +17,6 @@ public class FriendActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(Ping.class, this::onReceivePing)
- .match(RedirectServerStopInstanceReq.class, this::onReceiveRedirectServerStopInstanceReq)
.matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
.build();
}
@@ -47,47 +30,4 @@ public class FriendActor extends AbstractActor {
askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime());
getSender().tell(askResponse, getSelf());
}
-
- /**
- * 处理停止任务实例的请求
- */
- private void onReceiveRedirectServerStopInstanceReq(RedirectServerStopInstanceReq req) {
-
- Long instanceId = req.getServerStopInstanceReq().getInstanceId();
- String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId);
-
- // 非空,发请求停止任务实例
- if (StringUtils.isNotEmpty(taskTrackerAddress)) {
- OhMyServer.getTaskTrackerActor(taskTrackerAddress).tell(req.getServerStopInstanceReq(), getSelf());
- return;
- }
-
- // 空,可能刚经历 Server 变更 或 TaskTracker 宕机。先忽略吧,打条日志压压惊
- log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}), so stop instance may fail.", instanceId);
- }
-
- /**
- * 处理Server查询任务实例运行情况的请求
- */
- private void onReceiveRedirectServerQueryInstanceStatusReq(RedirectServerQueryInstanceStatusReq req) {
-
- Long instanceId = req.getReq().getInstanceId();
- String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId);
- AskResponse response = new AskResponse();
- if (StringUtils.isEmpty(taskTrackerAddress)) {
- response.setSuccess(false);
- response.setExtra("can't find TaskTracker");
- log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}).", instanceId);
- }else {
- try {
- CompletionStage