From a42bc496dfa7017af8da20f4f99fce8dcefe1789 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 9 Apr 2020 16:50:46 +0800 Subject: [PATCH] prepare to go to my brother's birthday party --- .../github/kfcfans/common/RemoteConstant.java | 1 + .../common/request/ServerScheduleJobReq.java | 3 + .../common/request/ServerStopInstanceReq.java | 20 ++++++ .../TaskTrackerReportInstanceStatusReq.java | 1 + .../kfcfans/oms/server/OhMyApplication.java | 2 +- .../server/{core => }/akka/OhMyServer.java | 7 +- .../oms/server/akka/actors/FriendActor.java | 61 +++++++++++++++++ .../akka => akka/actors}/ServerActor.java | 17 ++--- .../{core/akka => akka/requests}/Ping.java | 2 +- .../RedirectServerStopInstanceReq.java | 18 +++++ .../oms/server/common/constans/JobStatus.java | 8 +++ .../persistence/model/ExecuteLogDO.java | 2 +- .../oms/server/service/DispatchService.java | 2 +- .../service/ha/ServerSelectService.java | 4 +- .../instance}/InstanceManager.java | 18 ++++- .../instance}/InstanceStatusHolder.java | 4 +- .../timing/InstanceStatusCheckService.java | 36 +++++++++- .../timing/schedule/JobScheduleService.java | 51 ++------------- .../web/controller/InstanceController.java | 65 +++++++++++++++++++ .../server/web/controller/JobController.java | 50 +++++++++++++- .../web/controller/ServerController.java | 2 +- .../core/tracker/task/CommonTaskTracker.java | 2 + .../tracker/task/FrequentTaskTracker.java | 14 +++- 23 files changed, 311 insertions(+), 79 deletions(-) create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerStopInstanceReq.java rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/{core => }/akka/OhMyServer.java (87%) create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/{core/akka => akka/actors}/ServerActor.java (76%) rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/{core/akka => akka/requests}/Ping.java (80%) create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerStopInstanceReq.java rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/{core => service/instance}/InstanceManager.java (92%) rename oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/{core => service/instance}/InstanceStatusHolder.java (76%) create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java index 57953994..76a317d9 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java @@ -24,6 +24,7 @@ public class RemoteConstant { public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server"; public static final String SERVER_ACTOR_NAME = "server_actor"; + public static final String SERVER_FRIEND_ACTOR_NAME = "friend_actor"; public static final String SERVER_AKKA_CONFIG_NAME = "oms-server.akka.conf"; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java index 8a350143..4c41068d 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java @@ -65,4 +65,7 @@ public class ServerScheduleJobReq implements Serializable { // 时间表达式,CRON/NULL/LONG/LONG(单位MS) private String timeExpression; + // 最大同时运行任务数,默认 1 + private Integer maxInstanceNum; + } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerStopInstanceReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerStopInstanceReq.java new file mode 100644 index 00000000..cc27ac19 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerStopInstanceReq.java @@ -0,0 +1,20 @@ +package com.github.kfcfans.common.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 服务器要求任务实例停止执行请求 + * + * @author tjq + * @since 2020/4/9 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ServerStopInstanceReq implements Serializable { + private Long instanceId; +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java index b6635c5b..c19d4ecc 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java @@ -26,4 +26,5 @@ public class TaskTrackerReportInstanceStatusReq implements Serializable { private long failedTaskNum; private long reportTime; + private String sourceAddress; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java index d69313a4..ffeee5bc 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/OhMyApplication.java @@ -1,6 +1,6 @@ package com.github.kfcfans.oms.server; -import com.github.kfcfans.oms.server.core.akka.OhMyServer; +import com.github.kfcfans.oms.server.akka.OhMyServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java similarity index 87% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java index e7ee491f..70efa3e2 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/OhMyServer.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java @@ -1,10 +1,12 @@ -package com.github.kfcfans.oms.server.core.akka; +package com.github.kfcfans.oms.server.akka; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.utils.NetUtils; +import com.github.kfcfans.oms.server.akka.actors.FriendActor; +import com.github.kfcfans.oms.server.akka.actors.ServerActor; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -47,6 +49,7 @@ public class OhMyServer { actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME); + actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch); } @@ -57,7 +60,7 @@ public class OhMyServer { * @return ActorSelection */ public static ActorSelection getServerActor(String address) { - String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME); + String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME); return actorSystem.actorSelection(path); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java new file mode 100644 index 00000000..9ea752d4 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java @@ -0,0 +1,61 @@ +package com.github.kfcfans.oms.server.akka.actors; + +import akka.actor.AbstractActor; +import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; +import com.github.kfcfans.common.request.WorkerHeartbeat; +import com.github.kfcfans.common.response.AskResponse; +import com.github.kfcfans.oms.server.akka.OhMyServer; +import com.github.kfcfans.oms.server.akka.requests.Ping; +import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; +import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; +import com.github.kfcfans.oms.server.service.instance.InstanceManager; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +/** + * 处理朋友们的信息(处理服务器与服务器之间的通讯) + * + * @author tjq + * @since 2020/4/9 + */ +@Slf4j +public class FriendActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(Ping.class, this::onReceivePing) + .match(RedirectServerStopInstanceReq.class, this::onReceiveRedirectServerStopInstanceReq) + .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) + .build(); + } + + /** + * 处理存活检测的请求 + * @param ping 存活检测请求 + */ + private void onReceivePing(Ping ping) { + AskResponse askResponse = new AskResponse(); + askResponse.setSuccess(true); + askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime()); + getSender().tell(askResponse, getSelf()); + } + + /** + * 处理停止任务实例的请求 + * @param req 停止运行任务实例 + */ + private void onReceiveRedirectServerStopInstanceReq(RedirectServerStopInstanceReq req) { + + Long instanceId = req.getServerStopInstanceReq().getInstanceId(); + String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId); + + // 非空,发请求停止任务实例 + if (StringUtils.isNotEmpty(taskTrackerAddress)) { + OhMyServer.getTaskTrackerActor(taskTrackerAddress).tell(req.getServerStopInstanceReq(), getSelf()); + return; + } + + // 空,可能刚经历 Server 变更 或 TaskTracker 宕机。先忽略吧,打条日志压压惊 + log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}), so stop instance may fail.", instanceId); + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java similarity index 76% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java index 1b5b04e0..0da92ce0 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java @@ -1,10 +1,11 @@ -package com.github.kfcfans.oms.server.core.akka; +package com.github.kfcfans.oms.server.akka.actors; import akka.actor.AbstractActor; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.response.AskResponse; -import com.github.kfcfans.oms.server.core.InstanceManager; +import com.github.kfcfans.oms.server.akka.requests.Ping; +import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import lombok.extern.slf4j.Slf4j; @@ -22,21 +23,11 @@ public class ServerActor extends AbstractActor { return receiveBuilder() .match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat) .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) - .match(Ping.class, this::onReceivePing) .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) .build(); } - /** - * 处理存活检测的请求 - * @param ping 存活检测请求 - */ - private void onReceivePing(Ping ping) { - AskResponse askResponse = new AskResponse(); - askResponse.setSuccess(true); - askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime()); - getSender().tell(askResponse, getSelf()); - } + /** * 处理 Worker 的心跳请求 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/Ping.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/Ping.java similarity index 80% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/Ping.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/Ping.java index ed069f40..caad7d57 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/akka/Ping.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/Ping.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.server.core.akka; +package com.github.kfcfans.oms.server.akka.requests; import lombok.Data; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerStopInstanceReq.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerStopInstanceReq.java new file mode 100644 index 00000000..f652d472 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerStopInstanceReq.java @@ -0,0 +1,18 @@ +package com.github.kfcfans.oms.server.akka.requests; + +import com.github.kfcfans.common.request.ServerStopInstanceReq; +import lombok.Data; + +import java.io.Serializable; + +/** + * 重定向 ServerStopInstanceReq + * 被HTTP请求停止任务实例的机器需要将请求转发到该实例对应的Server上处理,由该Server下发到Worker(只有该Server持有Worker的地址信息) + * + * @author tjq + * @since 2020/4/9 + */ +@Data +public class RedirectServerStopInstanceReq implements Serializable { + private ServerStopInstanceReq serverStopInstanceReq; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java index 0b2df785..0648633e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/constans/JobStatus.java @@ -19,4 +19,12 @@ public enum JobStatus { private int v; + public static JobStatus of(int v) { + for (JobStatus type : values()) { + if (type.v == v) { + return type; + } + } + throw new IllegalArgumentException("unknown JobStatus of " + v); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java index 535313c7..8bc94bac 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java @@ -37,7 +37,7 @@ public class ExecuteLogDO { // 结束时间 private Long finishedTime; - // 总共执行的次数(CRON任务 -> 代表重试次数,FREQUENT -> 代表总执行次数) + // 总共执行的次数(用于重试判断) private Long runningTimes; private Date gmtCreate; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java index ca8d0d82..857602e9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -5,7 +5,7 @@ import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.common.request.ServerScheduleJobReq; -import com.github.kfcfans.oms.server.core.akka.OhMyServer; +import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java index 74b3ce76..978ce31a 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java @@ -3,8 +3,8 @@ package com.github.kfcfans.oms.server.service.ha; import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.common.response.AskResponse; -import com.github.kfcfans.oms.server.core.akka.OhMyServer; -import com.github.kfcfans.oms.server.core.akka.Ping; +import com.github.kfcfans.oms.server.akka.OhMyServer; +import com.github.kfcfans.oms.server.akka.requests.Ping; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.service.lock.LockService; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java similarity index 92% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceManager.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index 564e81e9..d12abb43 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.server.core; +package com.github.kfcfans.oms.server.service.instance; import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; @@ -119,11 +119,25 @@ public class InstanceManager { // 清除已完成的实例信息 if (finished) { - instanceId2JobInfo.remove(instanceId); instanceId2StatusHolder.remove(instanceId); + // 这一步也可能导致后面取不到 JobInfoDO + instanceId2JobInfo.remove(instanceId); } } + /** + * 获取某个任务实例对应的 TaskTracker 地址 + * @param instanceId 任务实例ID + * @return TaskTracker地址,IP:Port + */ + public static String getTaskTrackerAddress(Long instanceId) { + InstanceStatusHolder statusHolder = instanceId2StatusHolder.get(instanceId); + if (statusHolder == null) { + return null; + } + return statusHolder.getSourceAddress(); + } + private static ExecuteLogRepository getExecuteLogRepository() { while (executeLogRepository == null) { try { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceStatusHolder.java similarity index 76% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceStatusHolder.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceStatusHolder.java index 260a8fd7..cd3e42cc 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/core/InstanceStatusHolder.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceStatusHolder.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.server.core; +package com.github.kfcfans.oms.server.service.instance; import lombok.Data; @@ -22,4 +22,6 @@ public class InstanceStatusHolder { // 上次上报时间 private long lastReportTime; + // 源地址(TaskTracker 地址) + private String sourceAddress; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java index 3ecb1d3c..8bce9b01 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -1,7 +1,9 @@ package com.github.kfcfans.oms.server.service.timing; import com.github.kfcfans.common.InstanceStatus; -import com.github.kfcfans.oms.server.core.akka.OhMyServer; +import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.oms.server.common.constans.JobStatus; +import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; @@ -98,12 +100,40 @@ public class InstanceStatusCheckService { if (!CollectionUtils.isEmpty(failedInstances)) { log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances); failedInstances.forEach(instance -> { - // 重新派发 + JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); - dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); + TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); + JobStatus jobStatus = JobStatus.of(jobInfoDO.getStatus()); + + // 如果任务已关闭,则不进行重试,将任务置为失败即可 + if (jobStatus != JobStatus.ENABLE) { + updateFailedInstance(instance); + return; + } + + // 秒级任务,无限重试,直接派发 + if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) { + dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); + return; + } + + // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试 + if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) { + dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); + }else { + updateFailedInstance(instance); + } + }); } }); } + private void updateFailedInstance(ExecuteLogDO instance) { + instance.setStatus(InstanceStatus.FAILED.getV()); + instance.setFinishedTime(System.currentTimeMillis()); + instance.setGmtModified(new Date()); + instance.setResult("worker report timeout, maybe all worker down"); + executeLogRepository.saveAndFlush(instance); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java index a89d846c..30503ee3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java @@ -4,8 +4,8 @@ import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.oms.server.common.utils.CronExpression; -import com.github.kfcfans.oms.server.core.InstanceManager; -import com.github.kfcfans.oms.server.core.akka.OhMyServer; +import com.github.kfcfans.oms.server.service.instance.InstanceManager; +import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; @@ -29,11 +29,11 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.stream.Collectors; /** - * 任务调度执行服务 + * 任务调度执行服务(调度 CRON 表达式的任务进行执行) + * FIX_RATE和FIX_DELAY任务不需要被调度,创建后直接被派发到Worker执行,只需要失败重试机制(在InstanceStatusCheckService中完成) * * @author tjq * @since 2020/4/5 @@ -75,13 +75,6 @@ public class JobScheduleService { }catch (Exception e) { log.error("[JobScheduleService] schedule cron job failed.", e); } - - // 调度 FIX_RATE 和 FIX_DELAY JOB - try { - scheduleFrequentJob(allAppIds); - }catch (Exception e) { - log.error("[JobScheduleService] schedule frequent job failed.", e); - } log.info("[JobScheduleService] finished job schedule, using time {}.", stopwatch.stop()); } @@ -177,40 +170,4 @@ public class JobScheduleService { } }); } - - /** - * 调度 FIX_RATE 和 FIX_DELAY 的任务 - */ - private void scheduleFrequentJob(List appIds) { - - List fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionType(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV()); - List fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionType(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV()); - - List jobIds = Lists.newLinkedList(); - Map jobId2JobInfo = Maps.newHashMap(); - Consumer consumer = jobInfo -> { - jobIds.add(jobInfo.getId()); - jobId2JobInfo.put(jobInfo.getId(), jobInfo); - }; - fixDelayJobs.forEach(consumer); - fixRateJobs.forEach(consumer); - - if (CollectionUtils.isEmpty(jobIds)) { - log.info("[JobScheduleService] no frequent job need to schedule for appIds in {}.", appIds); - return; - } - - // 查询 ExecuteLog 表,不存在或非运行状态则重新调度 - List executeLogDOS = executeLogRepository.findByJobIdIn(jobIds); - executeLogDOS.forEach(executeLogDO -> { - if (executeLogDO.getStatus() == InstanceStatus.RUNNING.getV()) { - jobId2JobInfo.remove(executeLogDO.getJobId()); - } - }); - - // 重新Dispatch - jobId2JobInfo.values().forEach(jobInfoDO -> { - - }); - } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java new file mode 100644 index 00000000..69472c24 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -0,0 +1,65 @@ +package com.github.kfcfans.oms.server.web.controller; + +import akka.actor.ActorSelection; +import com.github.kfcfans.common.request.ServerStopInstanceReq; +import com.github.kfcfans.common.response.ResultDTO; +import com.github.kfcfans.oms.server.akka.OhMyServer; +import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; +import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; +import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; +import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; +import com.github.kfcfans.oms.server.service.ha.ServerSelectService; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +/** + * 任务实例 Controller + * + * @author tjq + * @since 2020/4/9 + */ +@RestController +@RequestMapping("/instance") +public class InstanceController { + + @Resource + private ExecuteLogRepository executeLogRepository; + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private AppInfoRepository appInfoRepository; + + @GetMapping("/stop") + public ResultDTO stopInstance(Long instanceId) throws Exception { + + // 联级查询:instanceId -> jobId -> appId -> serverAddress + ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); + if (executeLogDO == null) { + return ResultDTO.failed("invalid instanceId: " + instanceId); + } + + JobInfoDO jobInfoDO = jobInfoRepository.findById(executeLogDO.getJobId()).orElseThrow(() -> { + throw new RuntimeException("impossible"); + }); + + AppInfoDO appInfoDO = appInfoRepository.findById(jobInfoDO.getAppId()).orElseThrow(() -> { + throw new RuntimeException("impossible"); + }); + + String serverAddress = appInfoDO.getCurrentServer(); + + // 将请求转发给目标Server(HTTP -> AKKA) + ActorSelection serverActor = OhMyServer.getServerActor(serverAddress); + RedirectServerStopInstanceReq req = new RedirectServerStopInstanceReq(); + req.setServerStopInstanceReq(new ServerStopInstanceReq(instanceId)); + serverActor.tell(req, null); + + return ResultDTO.success(null); + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index f1a6db9b..e9a113e7 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -1,12 +1,18 @@ package com.github.kfcfans.oms.server.web.controller; import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.utils.CronExpression; +import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.service.DispatchService; +import com.github.kfcfans.oms.server.service.IdGenerateService; import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest; import org.springframework.beans.BeanUtils; import org.springframework.web.bind.annotation.GetMapping; @@ -16,6 +22,7 @@ import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Date; +import java.util.Optional; /** * 任务信息管理 Controller @@ -23,12 +30,16 @@ import java.util.Date; * @author tjq * @since 2020/3/30 */ -@RestController() +@RestController @RequestMapping("job") public class JobController { + @Resource + private DispatchService dispatchService; @Resource private JobInfoRepository jobInfoRepository; + @Resource + private ExecuteLogRepository executeLogRepository; @PostMapping("/save") public ResultDTO saveJobInfo(ModifyJobInfoRequest request) throws Exception { @@ -54,13 +65,46 @@ public class JobController { } jobInfoDO.setGmtModified(now); jobInfoRepository.saveAndFlush(jobInfoDO); + + // 秒级任务直接调度执行 + if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) { + + ExecuteLogDO executeLog = new ExecuteLogDO(); + executeLog.setJobId(jobInfoDO.getId()); + executeLog.setInstanceId(IdGenerateService.allocate()); + executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + executeLog.setExpectedTriggerTime(System.currentTimeMillis()); + executeLog.setGmtCreate(new Date()); + executeLog.setGmtModified(executeLog.getGmtCreate()); + + executeLogRepository.saveAndFlush(executeLog); + dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0); + } + + return ResultDTO.success(null); + } + + @GetMapping("/stop") + public ResultDTO stopJob(Long jobId) throws Exception { + updateJobStatus(jobId, JobStatus.STOPPED); return ResultDTO.success(null); } @GetMapping("/delete") - public ResultDTO deleteJobInfo(Long jobId) { - jobInfoRepository.deleteById(jobId); + public ResultDTO deleteJob(Long jobId) throws Exception { + updateJobStatus(jobId, JobStatus.DELETED); return ResultDTO.success(null); } + private void updateJobStatus(Long jobId, JobStatus status) { + JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> { + throw new IllegalArgumentException("can't find job which id is " + jobId); + }); + jobInfoDO.setStatus(status.getV()); + jobInfoRepository.saveAndFlush(jobInfoDO); + + // TODO: 关闭秒级任务 + + } + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java index 7d0bf4e9..f948d30a 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java @@ -1,6 +1,6 @@ package com.github.kfcfans.oms.server.web.controller; -import com.github.kfcfans.oms.server.core.akka.OhMyServer; +import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.service.ha.ServerSelectService; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java index fd9e7915..05884bed 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java @@ -112,6 +112,8 @@ public class CommonTaskTracker extends TaskTracker { req.setSucceedTaskNum(holder.succeedNum); req.setFailedTaskNum(holder.failedNum); req.setReportTime(System.currentTimeMillis()); + req.setSourceAddress(OhMyWorker.getWorkerAddress()); + // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果 TaskDO resultTask = null; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index 9e8ef59a..9c2c3299 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -42,6 +42,8 @@ public class FrequentTaskTracker extends TaskTracker { // 时间表达式类型 private TimeExpressionType timeExpressionType; private long timeParams; + // 最大同时运行实例数 + private int maxInstanceNum; // 总运行次数(正常情况不会出现锁竞争,直接用 Atomic 系列,锁竞争验证推荐 LongAdder) private AtomicLong triggerTimes; @@ -68,6 +70,7 @@ public class FrequentTaskTracker extends TaskTracker { // 0. 初始化实例变量 timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); timeParams = Long.parseLong(req.getTimeExpression()); + maxInstanceNum = req.getMaxInstanceNum(); triggerTimes = new AtomicLong(0); succeedTimes = new AtomicLong(0); @@ -133,8 +136,16 @@ public class FrequentTaskTracker extends TaskTracker { newRootTask.setCreatedTime(System.currentTimeMillis()); newRootTask.setLastModifiedTime(System.currentTimeMillis()); + // 判断是否超出最大执行实例数 + if (timeExpressionType == TimeExpressionType.FIX_RATE) { + if (subInstanceId2TimeHolder.size() > maxInstanceNum) { + log.error("[TaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); + processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE"); + return; + } + } - // 必须先持久化,持久化成功才能 Dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错) + // 必须先持久化,持久化成功才能 dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错) if (!taskPersistenceService.save(newRootTask)) { log.error("[TaskTracker-{}] Launcher create new root task failed.", instanceId); processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED"); @@ -259,6 +270,7 @@ public class FrequentTaskTracker extends TaskTracker { req.setSucceedTaskNum(succeedTimes.get()); req.setFailedTaskNum(failedTimes.get()); req.setReportTime(System.currentTimeMillis()); + req.setSourceAddress(OhMyWorker.getWorkerAddress()); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);