From cd49e1ad985f23fc78c545562a859e4b4353e8db Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 11 Apr 2020 12:14:36 +0800 Subject: [PATCH] try to do cluster test --- oh-my-scheduler-common/pom.xml | 8 ++ .../github/kfcfans/common/InstanceStatus.java | 8 +- .../kfcfans/common/SystemInstanceResult.java | 29 ++++++ .../kfcfans/oms/server/akka/OhMyServer.java | 2 +- .../oms/server/akka/actors/FriendActor.java | 60 ------------- .../RedirectServerQueryInstanceStatusReq.java | 17 ---- .../RedirectServerStopInstanceReq.java | 18 ---- .../persistence/model/ExecuteLogDO.java | 2 + .../repository/ExecuteLogRepository.java | 6 +- .../oms/server/service/DispatchService.java | 23 ++--- .../service/ha/ServerSelectService.java | 2 +- .../service/instance/InstanceDetail.java | 42 +++++++++ .../service/instance/InstanceManager.java | 10 ++- .../service/instance/InstanceService.java | 87 ++++++++++++++++++ .../timing/InstanceStatusCheckService.java | 6 +- .../web/controller/InstanceController.java | 58 ++---------- .../server/web/controller/JobController.java | 89 +++++++++++++++---- .../web/controller/ServerController.java | 3 +- .../server/web/controller/package-info.java | 7 -- .../web/request/ModifyJobInfoRequest.java | 2 - oh-my-scheduler-worker/pom.xml | 8 -- .../core/tracker/task/CommonTaskTracker.java | 8 +- .../kfcfans/oms/CommonTaskTrackerTest.java | 6 ++ .../kfcfans/oms/ProcessorTrackerTest.java | 1 - .../com/github/kfcfans/oms/TestUtils.java | 1 - .../processors/TestBroadcastProcessor.java | 7 +- 26 files changed, 297 insertions(+), 213 deletions(-) create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/SystemInstanceResult.java delete mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerQueryInstanceStatusReq.java delete mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerStopInstanceReq.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceDetail.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java delete mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/package-info.java diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml index 03d89bd4..32252222 100644 --- a/oh-my-scheduler-common/pom.xml +++ b/oh-my-scheduler-common/pom.xml @@ -16,6 +16,7 @@ 1.7.30 3.10 + 28.2-jre @@ -32,6 +33,13 @@ commons-lang3 ${commons.lang.version} + + + + com.google.guava + guava + ${guava.version} + \ No newline at end of file diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java index 1bb1f815..be6b9e0c 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java @@ -1,8 +1,11 @@ package com.github.kfcfans.common; +import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; +import java.util.List; + /** * 任务运行状态 * @@ -13,7 +16,7 @@ import lombok.Getter; @AllArgsConstructor public enum InstanceStatus { - WAITING_DISPATCH(1, "等待任务派发,任务处理Server时间轮中"), + WAITING_DISPATCH(1, "等待任务派发"), WAITING_WORKER_RECEIVE(2, "Server已完成任务派发,等待Worker接收"), RUNNING(3, "Worker接收成功,正在运行任务"), FAILED(4, "任务运行失败"), @@ -23,6 +26,9 @@ public enum InstanceStatus { private int v; private String des; + // 广义的运行状态 + public static final List generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v); + public static InstanceStatus of(int v) { for (InstanceStatus is : values()) { if (v == is.v) { diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/SystemInstanceResult.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/SystemInstanceResult.java new file mode 100644 index 00000000..d7d53918 --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/SystemInstanceResult.java @@ -0,0 +1,29 @@ +package com.github.kfcfans.common; + +/** + * 系统生成的任务实例运行结果 + * + * @author tjq + * @since 2020/4/11 + */ +public class SystemInstanceResult { + + // 同时运行的任务实例数过多 + public static final String TOO_MUCH_INSTANCE = "too much instance(%d>%d)"; + // 无可用worker + public static final String NO_WORKER_AVAILABLE = "no worker available"; + // 任务执行超时 + public static final String INSTANCE_EXECUTE_TIMEOUT = "instance execute timeout"; + // 创建根任务失败 + public static final String TASK_INIT_FAILED = "create root task failed"; + // 未知错误 + public static final String UNKNOWN_BUG = "unknown bug"; + // TaskTracker 长时间未上报 + public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down"; + + + // 被用户手动停止 + public static final String STOPPED_BY_USER = "stopped by user"; + + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java index 70efa3e2..f8b2b775 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/OhMyServer.java @@ -59,7 +59,7 @@ public class OhMyServer { * @param address IP:port * @return ActorSelection */ - public static ActorSelection getServerActor(String address) { + public static ActorSelection getFriendActor(String address) { String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME); return actorSystem.actorSelection(path); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java index 5ddc9f4c..620e6767 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java @@ -1,25 +1,9 @@ package com.github.kfcfans.oms.server.akka.actors; import akka.actor.AbstractActor; -import akka.pattern.Patterns; -import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; -import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.response.AskResponse; -import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.requests.Ping; -import com.github.kfcfans.oms.server.akka.requests.RedirectServerQueryInstanceStatusReq; -import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; -import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; -import com.github.kfcfans.oms.server.service.instance.InstanceManager; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; - -import java.io.Serializable; -import java.time.Duration; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -import static com.github.kfcfans.common.RemoteConstant.DEFAULT_TIMEOUT_MS; /** * 处理朋友们的信息(处理服务器与服务器之间的通讯) @@ -33,7 +17,6 @@ public class FriendActor extends AbstractActor { public Receive createReceive() { return receiveBuilder() .match(Ping.class, this::onReceivePing) - .match(RedirectServerStopInstanceReq.class, this::onReceiveRedirectServerStopInstanceReq) .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj)) .build(); } @@ -47,47 +30,4 @@ public class FriendActor extends AbstractActor { askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime()); getSender().tell(askResponse, getSelf()); } - - /** - * 处理停止任务实例的请求 - */ - private void onReceiveRedirectServerStopInstanceReq(RedirectServerStopInstanceReq req) { - - Long instanceId = req.getServerStopInstanceReq().getInstanceId(); - String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId); - - // 非空,发请求停止任务实例 - if (StringUtils.isNotEmpty(taskTrackerAddress)) { - OhMyServer.getTaskTrackerActor(taskTrackerAddress).tell(req.getServerStopInstanceReq(), getSelf()); - return; - } - - // 空,可能刚经历 Server 变更 或 TaskTracker 宕机。先忽略吧,打条日志压压惊 - log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}), so stop instance may fail.", instanceId); - } - - /** - * 处理Server查询任务实例运行情况的请求 - */ - private void onReceiveRedirectServerQueryInstanceStatusReq(RedirectServerQueryInstanceStatusReq req) { - - Long instanceId = req.getReq().getInstanceId(); - String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId); - AskResponse response = new AskResponse(); - if (StringUtils.isEmpty(taskTrackerAddress)) { - response.setSuccess(false); - response.setExtra("can't find TaskTracker"); - log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}).", instanceId); - }else { - try { - CompletionStage ask = Patterns.ask(OhMyServer.getTaskTrackerActor(taskTrackerAddress), req.getReq(), Duration.ofMillis(DEFAULT_TIMEOUT_MS)); - response = (AskResponse) ask.toCompletableFuture().get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - }catch (Exception e) { - log.warn("[FriendActor] Ask TaskTracker for instance(instanceId={}) status failed.", instanceId, e); - response.setSuccess(false); - response.setExtra(e.getMessage()); - } - } - getSender().tell(response, getSelf()); - } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerQueryInstanceStatusReq.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerQueryInstanceStatusReq.java deleted file mode 100644 index d883e13f..00000000 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerQueryInstanceStatusReq.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.github.kfcfans.oms.server.akka.requests; - -import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq; -import lombok.Data; - -import java.io.Serializable; - -/** - * 重定向 ServerQueryInstanceStatusReq - * - * @author tjq - * @since 2020/4/10 - */ -@Data -public class RedirectServerQueryInstanceStatusReq implements Serializable { - private ServerQueryInstanceStatusReq req; -} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerStopInstanceReq.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerStopInstanceReq.java deleted file mode 100644 index f652d472..00000000 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/RedirectServerStopInstanceReq.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.github.kfcfans.oms.server.akka.requests; - -import com.github.kfcfans.common.request.ServerStopInstanceReq; -import lombok.Data; - -import java.io.Serializable; - -/** - * 重定向 ServerStopInstanceReq - * 被HTTP请求停止任务实例的机器需要将请求转发到该实例对应的Server上处理,由该Server下发到Worker(只有该Server持有Worker的地址信息) - * - * @author tjq - * @since 2020/4/9 - */ -@Data -public class RedirectServerStopInstanceReq implements Serializable { - private ServerStopInstanceReq serverStopInstanceReq; -} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java index 69f38a06..13b33be8 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java @@ -38,6 +38,8 @@ public class ExecuteLogDO { private Long actualTriggerTime; // 结束时间 private Long finishedTime; + // TaskTracker地址 + private String taskTrackerAddress; // 总共执行的次数(用于重试判断) private Long runningTimes; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java index a7be2ced..d43c024e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java @@ -22,7 +22,7 @@ public interface ExecuteLogRepository extends JpaRepository */ long countByJobIdAndStatusIn(long jobId, List status); - List findByJobIdIn(List jobIds); + List findByJobIdAndStatusIn(long jobId, List status); /** @@ -35,8 +35,8 @@ public interface ExecuteLogRepository extends JpaRepository */ @Transactional @Modifying - @Query(value = "update execute_log set status = ?2, running_times = ?3, actual_trigger_time = now(), result = ?4, gmt_modified = now() where instance_id = ?1", nativeQuery = true) - int update4Trigger(long instanceId, int status, long runningTimes, String result); + @Query(value = "update execute_log set status = ?2, running_times = ?3, actual_trigger_time = now(), task_tracker_address = ?4, result = ?5, gmt_modified = now() where instance_id = ?1", nativeQuery = true) + int update4Trigger(long instanceId, int status, long runningTimes, String taskTrackerAddress, String result); @Modifying @Transactional diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java index 7aa4a073..fc738d89 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -1,9 +1,7 @@ package com.github.kfcfans.oms.server.service; import akka.actor.ActorSelection; -import com.github.kfcfans.common.ExecuteType; -import com.github.kfcfans.common.ProcessorType; -import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.common.*; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; @@ -37,23 +35,26 @@ public class DispatchService { // 前三个状态都视为运行中 private static final List runningStatus = Lists.newArrayList(WAITING_DISPATCH.getV(), WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()); - private static final String TOO_MUCH_REASON = "too much instance(%d>%d)"; - private static final String NO_WORKER_REASON = "no worker available"; private static final String EMPTY_RESULT = ""; + /** + * 将任务从Server派发到Worker(TaskTracker) + * @param jobInfo 任务的元信息 + * @param instanceId 任务实例ID + * @param currentRunningTimes 当前运行的次数 + */ public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) { - log.info("[DispatchService] start to dispatch job -> {}.", jobInfo); + log.info("[DispatchService] start to dispatch job: {}.", jobInfo); // 查询当前运行的实例数 long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobInfo.getId(), runningStatus); // 超出最大同时运行限制,不执行调度 if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { - String result = String.format(TOO_MUCH_REASON, runningInstanceCount, jobInfo.getMaxInstanceNum()); + String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount); - executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, result); - + executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, RemoteConstant.EMPTY_ADDRESS, result); return; } @@ -63,7 +64,7 @@ public class DispatchService { if (StringUtils.isEmpty(taskTrackerAddress)) { log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo); - executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, NO_WORKER_REASON); + executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE); return; } @@ -90,6 +91,6 @@ public class DispatchService { log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); // 修改状态 - executeLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, EMPTY_RESULT); + executeLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, taskTrackerAddress, EMPTY_RESULT); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java index 978ce31a..974dfaa0 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ServerSelectService.java @@ -114,7 +114,7 @@ public class ServerSelectService { Ping ping = new Ping(); ping.setCurrentTime(System.currentTimeMillis()); - ActorSelection serverActor = OhMyServer.getServerActor(serverAddress); + ActorSelection serverActor = OhMyServer.getFriendActor(serverAddress); try { CompletionStage askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceDetail.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceDetail.java new file mode 100644 index 00000000..1af37bd3 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceDetail.java @@ -0,0 +1,42 @@ +package com.github.kfcfans.oms.server.service.instance; + +import lombok.Data; + +/** + * 任务实例的运行详细信息(对外) + * + * @author tjq + * @since 2020/4/11 + */ +@Data +public class InstanceDetail { + + // 任务整体开始时间 + private long actualTriggerTime; + // 任务整体结束时间(可能不存在) + private long finishedTime; + // 任务状态(中文) + private String status; + // 任务执行结果(可能不存在) + private String result; + // TaskTracker地址 + private String taskTrackerAddress; + + private Object extra; + + + // 秒级任务的 extra -> List + private static class SubInstanceDetail { + private long startTime; + private long finishedTime; + private String status; + private String result; + } + + // MapReduce 和 Broadcast 任务的 extra -> + private static class ClusterDetail { + private long totalTaskNum; + private long succeedTaskNum; + private long failedTaskNum; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index f4f38bd3..55aa37f9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -15,6 +15,7 @@ import org.springframework.beans.BeanUtils; import java.util.Date; import java.util.Map; +import java.util.Optional; /** * 管理被调度的服务 @@ -63,8 +64,13 @@ public class InstanceManager { if (!instanceId2JobInfo.containsKey(instanceId)) { log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId); - JobInfoDO JobInfoDo = getJobInfoRepository().findById(jobId).orElseGet(JobInfoDO::new); - instanceId2JobInfo.put(instanceId, JobInfoDo); + Optional jobInfoDOOptional = getJobInfoRepository().findById(jobId); + if (jobInfoDOOptional.isPresent()) { + JobInfoDO JobInfoDo = jobInfoDOOptional.get(); + instanceId2JobInfo.put(instanceId, JobInfoDo); + }else { + throw new IllegalArgumentException("can't find JobIno by jobId:" + jobId); + } } // 更新本地保存的任务实例状态(用于未完成任务前的详细信息查询和缓存加速) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java new file mode 100644 index 00000000..172adf75 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java @@ -0,0 +1,87 @@ +package com.github.kfcfans.oms.server.service.instance; + +import akka.actor.ActorSelection; +import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.common.SystemInstanceResult; +import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.common.request.ServerStopInstanceReq; +import com.github.kfcfans.oms.server.akka.OhMyServer; +import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; +import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Date; + +import static com.github.kfcfans.common.InstanceStatus.RUNNING; +import static com.github.kfcfans.common.InstanceStatus.STOPPED; + +/** + * 任务运行实例服务 + * + * @author tjq + * @since 2020/4/11 + */ +@Slf4j +@Service +public class InstanceService { + + @Resource + private AppInfoRepository appInfoRepository; + @Resource + private ExecuteLogRepository executeLogRepository; + + /** + * 停止任务实例 + * @param instanceId 任务实例ID + */ + public void stopInstance(Long instanceId) { + + ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); + if (executeLogDO == null) { + log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId); + throw new IllegalArgumentException("invalid instanceId: " + instanceId); + } + // 更新数据库,将状态置为停止 + executeLogDO.setStatus(STOPPED.getV()); + executeLogDO.setGmtModified(new Date()); + executeLogDO.setFinishedTime(System.currentTimeMillis()); + executeLogDO.setResult(SystemInstanceResult.STOPPED_BY_USER); + executeLogRepository.saveAndFlush(executeLogDO); + + // 停止 TaskTracker + ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(executeLogDO.getTaskTrackerAddress()); + ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId); + taskTrackerActor.tell(req, null); + } + + public InstanceDetail getInstanceDetail(Long instanceId) { + + ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); + if (executeLogDO == null) { + log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId); + throw new IllegalArgumentException("invalid instanceId: " + instanceId); + } + + InstanceStatus instanceStatus = InstanceStatus.of(executeLogDO.getStatus()); + + InstanceDetail detail = new InstanceDetail(); + detail.setStatus(instanceStatus.getDes()); + + // 只要不是运行状态,只需要返回简要信息 + if (instanceStatus != RUNNING) { + BeanUtils.copyProperties(executeLogDO, detail); + return detail; + } + + // 运行状态下,需要分别考虑MapReduce、Broadcast和秒级任务的详细信息 + + + + return null; + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java index 8bce9b01..baafa316 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.server.service.timing; import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.common.SystemInstanceResult; import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.akka.OhMyServer; @@ -129,11 +130,14 @@ public class InstanceStatusCheckService { }); } + /** + * 处理上报超时而失败的任务实例 + */ private void updateFailedInstance(ExecuteLogDO instance) { instance.setStatus(InstanceStatus.FAILED.getV()); instance.setFinishedTime(System.currentTimeMillis()); instance.setGmtModified(new Date()); - instance.setResult("worker report timeout, maybe all worker down"); + instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); executeLogRepository.saveAndFlush(instance); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index f1b43953..72af4a89 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -1,25 +1,14 @@ package com.github.kfcfans.oms.server.web.controller; -import akka.actor.ActorSelection; -import com.github.kfcfans.common.InstanceStatus; -import com.github.kfcfans.common.request.ServerStopInstanceReq; import com.github.kfcfans.common.response.ResultDTO; -import com.github.kfcfans.oms.server.akka.OhMyServer; -import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; -import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; -import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; -import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; -import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; -import org.apache.commons.lang3.StringUtils; +import com.github.kfcfans.oms.server.service.instance.InstanceDetail; +import com.github.kfcfans.oms.server.service.instance.InstanceService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; -import java.util.Date; - -import static com.github.kfcfans.common.InstanceStatus.*; /** * 任务实例 Controller @@ -32,51 +21,16 @@ import static com.github.kfcfans.common.InstanceStatus.*; public class InstanceController { @Resource - private ExecuteLogRepository executeLogRepository; - @Resource - private AppInfoRepository appInfoRepository; + private InstanceService instanceService; @GetMapping("/stop") public ResultDTO stopInstance(Long instanceId) { - - ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); - if (executeLogDO == null) { - return ResultDTO.failed("invalid instanceId: " + instanceId); - } - // 更新数据库,将状态置为停止 - executeLogDO.setStatus(STOPPED.getV()); - executeLogDO.setGmtModified(new Date()); - executeLogDO.setFinishedTime(System.currentTimeMillis()); - executeLogDO.setResult("STOPPED_BY_USER"); - executeLogRepository.saveAndFlush(executeLogDO); - - // 获取Server地址,准备转发请求 - AppInfoDO appInfoDO = appInfoRepository.findById(executeLogDO.getAppId()).orElse(new AppInfoDO()); - if (StringUtils.isEmpty(appInfoDO.getCurrentServer())) { - return ResultDTO.failed("can't find server"); - } - - // 将请求转发给目标Server(HTTP -> AKKA) - ActorSelection serverActor = OhMyServer.getServerActor(appInfoDO.getCurrentServer()); - RedirectServerStopInstanceReq req = new RedirectServerStopInstanceReq(); - req.setServerStopInstanceReq(new ServerStopInstanceReq(instanceId)); - serverActor.tell(req, null); + instanceService.stopInstance(instanceId); return ResultDTO.success(null); } @GetMapping("/status") - public ResultDTO getRunningStatus(Long instanceId) { - - ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); - if (executeLogDO == null) { - return ResultDTO.failed("invalid instanceId: " + instanceId); - } - - InstanceStatus status = InstanceStatus.of(executeLogDO.getStatus()); - if (status == FAILED || status == SUCCEED || status == STOPPED) { - - } - - return null; + public ResultDTO getRunningStatus(Long instanceId) { + return ResultDTO.success(instanceService.getInstanceDetail(instanceId)); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index 34f8bbd5..f8563013 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -13,8 +13,11 @@ import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.service.DispatchService; import com.github.kfcfans.oms.server.service.IdGenerateService; +import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -22,6 +25,7 @@ import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Date; +import java.util.List; import java.util.Optional; /** @@ -30,12 +34,16 @@ import java.util.Optional; * @author tjq * @since 2020/3/30 */ +@Slf4j @RestController @RequestMapping("job") public class JobController { @Resource private DispatchService dispatchService; + @Resource + private InstanceService instanceService; + @Resource private JobInfoRepository jobInfoRepository; @Resource @@ -68,18 +76,7 @@ public class JobController { // 秒级任务直接调度执行 if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) { - - ExecuteLogDO executeLog = new ExecuteLogDO(); - executeLog.setJobId(jobInfoDO.getId()); - executeLog.setAppId(jobInfoDO.getAppId()); - executeLog.setInstanceId(IdGenerateService.allocate()); - executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); - executeLog.setExpectedTriggerTime(System.currentTimeMillis()); - executeLog.setGmtCreate(new Date()); - executeLog.setGmtModified(executeLog.getGmtCreate()); - - executeLogRepository.saveAndFlush(executeLog); - dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0); + runJobImmediately(jobInfoDO); } return ResultDTO.success(null); @@ -87,25 +84,79 @@ public class JobController { @GetMapping("/stop") public ResultDTO stopJob(Long jobId) throws Exception { - updateJobStatus(jobId, JobStatus.STOPPED); + shutdownOrStopJob(jobId, JobStatus.STOPPED); return ResultDTO.success(null); } @GetMapping("/delete") public ResultDTO deleteJob(Long jobId) throws Exception { - updateJobStatus(jobId, JobStatus.DELETED); + shutdownOrStopJob(jobId, JobStatus.DELETED); return ResultDTO.success(null); } - private void updateJobStatus(Long jobId, JobStatus status) { - JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> { - throw new IllegalArgumentException("can't find job which id is " + jobId); - }); + @GetMapping("/run") + public ResultDTO runImmediately(Long jobId) { + Optional jobInfoOPT = jobInfoRepository.findById(jobId); + if (!jobInfoOPT.isPresent()) { + throw new IllegalArgumentException("can't find job by jobId:" + jobId); + } + runJobImmediately(jobInfoOPT.get()); + return ResultDTO.success(null); + } + + /** + * 立即运行JOB + */ + private void runJobImmediately(JobInfoDO jobInfoDO) { + ExecuteLogDO executeLog = new ExecuteLogDO(); + executeLog.setJobId(jobInfoDO.getId()); + executeLog.setAppId(jobInfoDO.getAppId()); + executeLog.setInstanceId(IdGenerateService.allocate()); + executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + executeLog.setExpectedTriggerTime(System.currentTimeMillis()); + executeLog.setGmtCreate(new Date()); + executeLog.setGmtModified(executeLog.getGmtCreate()); + + executeLogRepository.saveAndFlush(executeLog); + dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0); + } + + /** + * 停止或删除某个JOB + * 秒级任务还要额外停止正在运行的任务实例 + */ + private void shutdownOrStopJob(Long jobId, JobStatus status) throws IllegalArgumentException { + + // 1. 先更新 job_info 表 + Optional jobInfoOPT = jobInfoRepository.findById(jobId); + if (!jobInfoOPT.isPresent()) { + throw new IllegalArgumentException("can't find job by jobId:" + jobId); + } + JobInfoDO jobInfoDO = jobInfoOPT.get(); jobInfoDO.setStatus(status.getV()); jobInfoRepository.saveAndFlush(jobInfoDO); - // TODO: 关闭秒级任务 + // 2. 关闭秒级任务 + TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); + if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) { + return; + } + List executeLogs = executeLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus); + if (CollectionUtils.isEmpty(executeLogs)) { + return; + } + if (executeLogs.size() > 1) { + log.warn("[JobController] frequent job has multi instance, there must ha"); + } + executeLogs.forEach(instance -> { + try { + // 重复查询了数据库,不过问题不大,这个调用量很小 + instanceService.stopInstance(instance.getInstanceId()); + }catch (Exception ignore) { + } + }); } + } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java index f948d30a..9c585833 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/ServerController.java @@ -12,7 +12,8 @@ import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** - * 处理内部请求的 Controller + * 处理Worker请求的 Controller + * Worker启动时,先请求assert验证appName的可用性,再根据得到的appId获取Server地址 * * @author tjq * @since 2020/4/4 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/package-info.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/package-info.java deleted file mode 100644 index 50105a5d..00000000 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/package-info.java +++ /dev/null @@ -1,7 +0,0 @@ -/** - * CRUD较为简单,就不单独搞 Service 层了 - * - * @author tjq - * @since 2020/4/2 - */ -package com.github.kfcfans.oms.server.web.controller; \ No newline at end of file diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java index 92100056..3b2aa9d8 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java @@ -49,8 +49,6 @@ public class ModifyJobInfoRequest { private Integer concurrency; // 任务整体超时时间 private Long instanceTimeLimit; - // 任务的每一个Task超时时间 - private Long taskTimeLimit; /* ************************** 重试配置 ************************** */ private Integer instanceRetryNum; diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index 3050d3be..e8ed26aa 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -19,7 +19,6 @@ 1.0.0-SNAPSHOT 1.4.200 3.4.2 - 28.2-jre 5.6.1 5.0.0-RC5 1.2.68 @@ -63,13 +62,6 @@ ${hikaricp.version} - - - com.google.guava - guava - ${guava.version} - - com.esotericsoftware diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java index 0c1e0922..ff1a9f18 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java @@ -4,6 +4,7 @@ import akka.actor.ActorSelection; import akka.pattern.Patterns; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.common.SystemInstanceResult; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.response.AskResponse; @@ -21,7 +22,6 @@ import org.springframework.util.CollectionUtils; import java.time.Duration; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.*; @@ -125,7 +125,7 @@ public class CommonTaskTracker extends TaskTracker { if (finishedNum == 0) { finished.set(true); success = false; - result = "CREATE_ROOT_TASK_FAILED"; + result = SystemInstanceResult.TASK_INIT_FAILED; }else { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); @@ -137,7 +137,7 @@ public class CommonTaskTracker extends TaskTracker { List allTask = taskPersistenceService.getAllTask(instanceId, instanceId); if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) { success = false; - result = "UNKNOWN BUG"; + result = SystemInstanceResult.UNKNOWN_BUG; log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId); }else { result = allTask.get(0).getResult(); @@ -178,7 +178,7 @@ public class CommonTaskTracker extends TaskTracker { if (isTimeout()) { finished.set(true); success = false; - result = "TIMEOUT"; + result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT; } String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java index 97957db0..72929dc6 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/CommonTaskTrackerTest.java @@ -57,4 +57,10 @@ public class CommonTaskTrackerTest { Thread.sleep(5000000); } + @Test + public void testBroadcast() throws Exception { + remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.BROADCAST, TimeExpressionType.CRON), null); + Thread.sleep(5000000); + } + } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java index 1eae22eb..bc73c5e1 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ProcessorTrackerTest.java @@ -72,7 +72,6 @@ public class ProcessorTrackerTest { instanceInfo.setProcessorInfo(processor); instanceInfo.setInstanceTimeoutMS(500000); - instanceInfo.setTaskTimeoutMS(5000000); instanceInfo.setThreadConcurrency(5); instanceInfo.setTaskRetryNum(3); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TestUtils.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TestUtils.java index 0fae00f3..b51fe967 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TestUtils.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/TestUtils.java @@ -29,7 +29,6 @@ public class TestUtils { req.setTaskRetryNum(3); req.setThreadConcurrency(10); req.setInstanceTimeoutMS(500000); - req.setTaskTimeoutMS(500000); req.setTimeExpressionType(timeExpressionType.name()); switch (timeExpressionType) { case CRON:req.setTimeExpression("0 * * * * ? "); diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java index 8460cfb6..59554343 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/processors/TestBroadcastProcessor.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.processors; +import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.oms.worker.sdk.ProcessResult; import com.github.kfcfans.oms.worker.sdk.TaskContext; import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; @@ -16,14 +17,14 @@ public class TestBroadcastProcessor implements BroadcastProcessor { @Override public ProcessResult preProcess(TaskContext taskContext) throws Exception { System.out.println("=============== TestBroadcastProcessor#preProcess ==============="); - System.out.println("taskContext:" + taskContext); + System.out.println("taskContext:" + JSONObject.toJSONString(taskContext)); return new ProcessResult(true, "preProcess success"); } @Override public ProcessResult postProcess(TaskContext taskContext, Map taskId2Result) throws Exception { System.out.println("=============== TestBroadcastProcessor#postProcess ==============="); - System.out.println("taskContext:" + taskContext); + System.out.println("taskContext:" + JSONObject.toJSONString(taskContext)); System.out.println("taskId2Result:" + taskId2Result); return new ProcessResult(true, "postProcess success"); } @@ -31,7 +32,7 @@ public class TestBroadcastProcessor implements BroadcastProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { System.out.println("=============== TestBroadcastProcessor#process ==============="); - System.out.println("taskContext:" + context); + System.out.println("taskContext:" + JSONObject.toJSONString(context)); return new ProcessResult(true, "processSuccess"); } }