diff --git a/oh-my-scheduler-client/src/test/java/TestClient.java b/oh-my-scheduler-client/src/test/java/TestClient.java index 386191df..cc1ce23e 100644 --- a/oh-my-scheduler-client/src/test/java/TestClient.java +++ b/oh-my-scheduler-client/src/test/java/TestClient.java @@ -1,3 +1,4 @@ +import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.oms.client.OhMyClient; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -18,13 +19,12 @@ public class TestClient { } @Test - public void testInstanceOpenAPI() throws Exception { - System.out.println(ohMyClient.stopInstance(1586855173043L)); - System.out.println(ohMyClient.fetchInstanceStatus(1586855173043L)); + public void testStopInstance() throws Exception { + ResultDTO res = ohMyClient.stopInstance(132522955178508352L); + System.out.println(res.toString()); } - @Test - public void testJobOpenAPI() throws Exception { - System.out.println(ohMyClient.runJob(1L, "hhhh")); + public void testFetchInstanceStatus() throws Exception { + System.out.println(ohMyClient.fetchInstanceStatus(132522955178508352L)); } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java index 6a9438ec..7a14c01c 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java @@ -12,7 +12,7 @@ public class RemoteConstant { /* ************************ AKKA WORKER ************************ */ - public static final int DEFAULT_WORKER_PORT = 2777; + public static final int DEFAULT_WORKER_PORT = 27777; public static final String WORKER_ACTOR_SYSTEM_NAME = "oms"; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/NetUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/NetUtils.java index a1d94886..42313593 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/NetUtils.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/NetUtils.java @@ -29,7 +29,7 @@ public class NetUtils { // valid port range is (0, 65535] private static final int MIN_PORT = 0; - private static final int MAX_PORT = 65535; + public static final int MAX_PORT = 65535; private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\d{1,3}(\\.\\d{1,3}){3}\\:\\d{1,5}$"); private static final Pattern LOCAL_IP_PATTERN = Pattern.compile("127(\\.\\d{1,3}){3}$"); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java index 56326521..13f211eb 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -116,7 +116,7 @@ public class DispatchService { req.setThreadConcurrency(jobInfo.getConcurrency()); // 发送请求(不可靠,需要一个后台线程定期轮询状态) - String taskTrackerAddress = allAvailableWorker.get(0); + String taskTrackerAddress = finalWorkers.get(0); ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress); taskTrackerActor.tell(req, null); log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java index 974dfaa0..5bb02f7d 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java @@ -86,6 +86,7 @@ public class ServerSelectService { appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); + log.info("[ServerSelectService] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); return appInfo.getCurrentServer(); }catch (Exception e) { log.warn("[ServerSelectService] write new server to db failed for app {}.", appName); @@ -121,7 +122,7 @@ public class ServerSelectService { downServerCache.remove(serverAddress); return response.isSuccess(); }catch (Exception e) { - log.warn("[ServerSelectService] server({}) was down, I will be the new server.", serverAddress); + log.warn("[ServerSelectService] server({}) was down.", serverAddress); } downServerCache.add(serverAddress); return false; diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/TimeoutProcessor.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/TimeoutProcessor.java new file mode 100644 index 00000000..2528d492 --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/TimeoutProcessor.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.oms.server.processors; + +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; +import org.springframework.stereotype.Component; + +/** + * 测试超时任务 + * + * @author tjq + * @since 2020/4/20 + */ +@Component +public class TimeoutProcessor implements BasicProcessor { + @Override + public ProcessResult process(TaskContext context) throws Exception { + Thread.sleep(Long.parseLong(context.getJobParams())); + return new ProcessResult(true, "impossible~~~~QAQ~"); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index f02801a6..2a89762f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.remote.RemoteTransportException; import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.common.utils.JsonUtils; @@ -81,21 +82,28 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env."); } - // 初始化 ActorSystem - Map overrideConfig = Maps.newHashMap(); - int port = NetUtils.getAvailablePort(RemoteConstant.DEFAULT_WORKER_PORT); - overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost()); - overrideConfig.put("akka.remote.artery.canonical.port", port); - workerAddress = NetUtils.getLocalHost() + ":" + port; - log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress); + // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了) + for (int port = NetUtils.getAvailablePort(RemoteConstant.DEFAULT_WORKER_PORT); port < NetUtils.MAX_PORT; port++) { + try { + Map overrideConfig = Maps.newHashMap(); + overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost()); + overrideConfig.put("akka.remote.artery.canonical.port", port); + workerAddress = NetUtils.getLocalHost() + ":" + port; - Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); - Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); + Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); + Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); - actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME); - actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); + actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); + actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + + log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress); + log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); + break; + }catch (RemoteTransportException ignore) { + log.warn("[OhMyWorker] port:{} already in use, try to use a new port.", port); + } + } // 初始化存储 TaskPersistenceService.INSTANCE.init(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java index 63140841..a7cfb21f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java @@ -32,28 +32,12 @@ public class ProcessorTrackerActor extends AbstractActor { * 处理来自TaskTracker的task执行请求 */ private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { + Long jobId = req.getInstanceInfo().getJobId(); Long instanceId = req.getInstanceInfo().getInstanceId(); - ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> { - try { - ProcessorTracker pt = new ProcessorTracker(req); - log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId); - return pt; - }catch (Exception e) { - log.warn("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) failed.", jobId, instanceId, e); - // 直接上报失败 - ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, req.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), e.getMessage(), System.currentTimeMillis()); - getSender().tell(report, getSelf()); - - } - return null; - }); - - // 创建失败,直接返回 - if (processorTracker == null) { - return; - } + // 创建 ProcessorTracker 一定能成功,且每个任务实例只会创建一个 ProcessorTracker + ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req)); TaskDO task = new TaskDO(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatus.java index 79366377..a8d606be 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatus.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/ha/ProcessorTrackerStatus.java @@ -91,7 +91,11 @@ public class ProcessorTrackerStatus { * 是否超时(超过一定时间没有收到心跳) */ public boolean isTimeout() { - return System.currentTimeMillis() - lastActiveTime > HEARTBEAT_TIMEOUT_MS; + if (dispatched) { + return System.currentTimeMillis() - lastActiveTime > HEARTBEAT_TIMEOUT_MS; + } + // 未曾派发过任务的机器,不用处理 + return false; } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index ff2a8da8..c3f9781c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -26,6 +26,7 @@ import org.springframework.util.CollectionUtils; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; /** * 负责管理 Processor 的执行 @@ -54,25 +55,36 @@ public class ProcessorTracker { private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100; + // 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败) + private boolean lethal = false; + private String lethalReason; + /** * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T) */ - public ProcessorTracker(TaskTrackerStartTaskReq request) throws Exception { + public ProcessorTracker(TaskTrackerStartTaskReq request) { + try { + // 赋值 + this.startTime = System.currentTimeMillis(); + this.instanceInfo = request.getInstanceInfo(); + this.instanceId = request.getInstanceInfo().getInstanceId(); + this.taskTrackerAddress = request.getTaskTrackerAddress(); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); + this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); - // 赋值 - this.startTime = System.currentTimeMillis(); - this.instanceInfo = request.getInstanceInfo(); - this.instanceId = request.getInstanceInfo().getInstanceId(); - this.taskTrackerAddress = request.getTaskTrackerAddress(); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); - this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); + // 初始化 线程池 + initThreadPool(); + // 初始化 Processor + initProcessor(); + // 初始化定时任务 + initTimingJob(); - // 初始化 线程池 - initThreadPool(); - // 初始化 Processor - initProcessor(); - // 初始化定时任务 - initTimingJob(); + log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId); + }catch (Exception e) { + log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, e); + lethal = true; + lethalReason = e.toString(); + } } /** @@ -87,6 +99,14 @@ public class ProcessorTracker { */ public void submitTask(TaskDO newTask) { + // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁 + // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T + if (lethal) { + ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis()); + taskTrackerActorRef.tell(report, null); + return; + } + boolean success = false; // 1. 设置值并提交执行 newTask.setInstanceId(instanceInfo.getInstanceId()); @@ -215,7 +235,7 @@ public class ProcessorTracker { try { processor = SpringUtils.getBean(processorInfo); }catch (Exception e) { - log.warn("[ProcessorRunnable-{}] no spring bean of processor(className={}).", instanceId, processorInfo, e); + log.warn("[ProcessorRunnable-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString()); } } // 反射加载 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index f85fe675..e7ac59bb 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -71,7 +71,8 @@ public abstract class TaskTracker { BeanUtils.copyProperties(req, instanceInfo); // 特殊处理超时时间 if (instanceInfo.getInstanceTimeoutMS() <= 0) { - instanceInfo.setInstanceTimeoutMS(Long.MAX_VALUE); + // Integer最大值:2147483647,一天的毫秒数:86400000;够执行24天了...要是不满足需求就让开发者手动指定吧 + instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE); } // 赋予时间表达式类型 instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV()); diff --git a/others/img/oms-console-jobCreator.png b/others/img/oms-console-jobCreator.png index 413ffefc..29a883db 100644 Binary files a/others/img/oms-console-jobCreator.png and b/others/img/oms-console-jobCreator.png differ diff --git a/others/img/oms-console-jobManager.png b/others/img/oms-console-jobManager.png index c8fb6a49..d902898f 100644 Binary files a/others/img/oms-console-jobManager.png and b/others/img/oms-console-jobManager.png differ diff --git a/others/img/oms-console-main.png b/others/img/oms-console-main.png index 29a78cde..2239336d 100644 Binary files a/others/img/oms-console-main.png and b/others/img/oms-console-main.png differ diff --git a/others/img/oms-console-runningStatus.png b/others/img/oms-console-runningStatus.png index 7448d1f2..2dbaec09 100644 Binary files a/others/img/oms-console-runningStatus.png and b/others/img/oms-console-runningStatus.png differ diff --git a/others/logs/TestRecord.md b/others/logs/TestRecord.md index d8abfd0e..a97e9c0b 100644 --- a/others/logs/TestRecord.md +++ b/others/logs/TestRecord.md @@ -91,4 +91,15 @@ java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6). ``` -解决方案:初步怀疑在连续更改时,由于数据库锁的存在导致行不可见(不知道H2具体的特性)。因此,需要保证同一个taskId串行更新 -> synchronize Yes! \ No newline at end of file +解决方案:初步怀疑在连续更改时,由于数据库锁的存在导致行不可见(不知道H2具体的特性)。因此,需要保证同一个taskId串行更新 -> synchronize Yes! + +# 2020.4.20 1.0.0发布前测试 +#### Server & Worker +* 指定机器执行 -> 验证通过 +* Map/MapReduce/Standalone/Broadcast/Shell/Python处理器的执行 -> 验证通过 +* 超时失败 -> 验证通过 +* 破坏测试:指定错误的处理器 -> 发现问题,会造成死锁(TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T)。通过确保ProcessorTracker一定能创建成功解决,如果处理器构建失败,之后所有提交的任务直接返回错误。 +#### Client +* StopInstance -> success +* FetchInstanceStatus -> success +