diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java index be6b9e0c..6ef04e1f 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java @@ -17,8 +17,8 @@ import java.util.List; public enum InstanceStatus { WAITING_DISPATCH(1, "等待任务派发"), - WAITING_WORKER_RECEIVE(2, "Server已完成任务派发,等待Worker接收"), - RUNNING(3, "Worker接收成功,正在运行任务"), + WAITING_WORKER_RECEIVE(2, "等待Worker接收"), + RUNNING(3, "运行中"), FAILED(4, "任务运行失败"), SUCCEED(5, "任务运行成功"), STOPPED(10, "任务被手动停止"); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceDetail.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java similarity index 75% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceDetail.java rename to oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java index 1af37bd3..07543192 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceDetail.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java @@ -1,7 +1,9 @@ -package com.github.kfcfans.oms.server.service.instance; +package com.github.kfcfans.common.model; import lombok.Data; +import java.io.Serializable; + /** * 任务实例的运行详细信息(对外) * @@ -9,7 +11,7 @@ import lombok.Data; * @since 2020/4/11 */ @Data -public class InstanceDetail { +public class InstanceDetail implements Serializable { // 任务整体开始时间 private long actualTriggerTime; @@ -26,15 +28,17 @@ public class InstanceDetail { // 秒级任务的 extra -> List - private static class SubInstanceDetail { + @Data + public static class SubInstanceDetail implements Serializable { private long startTime; private long finishedTime; - private String status; private String result; + private String status; } // MapReduce 和 Broadcast 任务的 extra -> - private static class ClusterDetail { + @Data + public static class TaskDetail implements Serializable { private long totalTaskNum; private long succeedTaskNum; private long failedTaskNum; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java index 8e31cf2b..42808dc8 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java @@ -31,8 +31,6 @@ public class SystemMetrics implements Serializable, Comparable { // 缓存分数 private int score; - public static final int MIN_SCORE = 1; - @Override public int compareTo(SystemMetrics that) { return this.calculateScore() - that.calculateScore(); @@ -50,21 +48,25 @@ public class SystemMetrics implements Serializable, Comparable { double availableCPUCores = cpuProcessors * cpuLoad; double availableMemory = jvmMaxMemory - jvmUsedMemory; - double availableDisk = diskTotal - diskUsage; - // 保护性判断,Windows下无法获取CPU可用核心数,先固定 0.5 - if (availableCPUCores < 0) { - availableCPUCores = 0.5; - } + // Windows下无法获取CPU可用核心数,值固定为-1 + cpuLoad = Math.max(0, cpuLoad); - // 最低运行标准,1G磁盘 & 0.5G内存 & 一个可用的CPU核心 - if (availableDisk < 1 || availableMemory < 0.5 || availableCPUCores < 0.5) { - score = MIN_SCORE; - } else { - // 磁盘只需要满足最低标准即可 - score = (int) (availableMemory * 2 + availableCPUCores); - } + return (int) (availableMemory * 2 + availableCPUCores); + } - return score; + /** + * 该机器是否可用 + * @param minCPUCores 判断标准之最低可用CPU核心数量 + * @param minMemorySpace 判断标准之最低可用内存 + * @param minDiskSpace 判断标准之最低可用磁盘空间 + * @return 是否可用 + */ + public boolean available(double minCPUCores, double minMemorySpace, double minDiskSpace) { + + double currentCpuCores = Math.max(cpuLoad * cpuProcessors, 0); + double currentMemory = jvmMaxMemory - jvmUsedMemory; + double currentDisk = diskTotal - diskUsed; + return currentCpuCores >= minCPUCores && currentMemory >= minMemorySpace && currentDisk >= minDiskSpace; } } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerQueryInstanceStatusReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerQueryInstanceStatusReq.java index 7d0e10d9..3c900192 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerQueryInstanceStatusReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerQueryInstanceStatusReq.java @@ -1,6 +1,8 @@ package com.github.kfcfans.common.request; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; @@ -11,6 +13,8 @@ import java.io.Serializable; * @since 2020/4/10 */ @Data +@NoArgsConstructor +@AllArgsConstructor public class ServerQueryInstanceStatusReq implements Serializable { private Long instanceId; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/PageResult.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/PageResult.java new file mode 100644 index 00000000..25c0909e --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/PageResult.java @@ -0,0 +1,47 @@ +package com.github.kfcfans.oms.server.persistence; + +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.domain.Page; + +import java.io.Serializable; +import java.util.List; + +/** + * 分页对象 + * + * @author tjq + * @since 2020/4/12 + */ +@Data +@NoArgsConstructor +public class PageResult implements Serializable { + + /** + * 当前页数 + */ + private int index; + /** + * 页大小 + */ + private int pageSize; + /** + * 总页数 + */ + private int totalPages; + /** + * 总数据量 + */ + private long totalItems; + /** + * 数据 + */ + private List data; + + public PageResult(Page page) { + index = page.getNumber(); + pageSize = page.getSize(); + totalPages = page.getTotalPages(); + totalItems = page.getTotalElements(); + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/InstanceLogDO.java similarity index 90% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/InstanceLogDO.java index 13b33be8..bb501ace 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/ExecuteLogDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/InstanceLogDO.java @@ -13,8 +13,8 @@ import java.util.Date; */ @Data @Entity -@Table(name = "execute_log", indexes = {@Index(columnList = "jobId")}) -public class ExecuteLogDO { +@Table(name = "instance_log", indexes = {@Index(columnList = "jobId")}) +public class InstanceLogDO { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java index 5789efeb..4ef9c95b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/JobInfoDO.java @@ -65,6 +65,14 @@ public class JobInfoDO { // 下一次调度时间 private Long nextTriggerTime; + /* ************************** 繁忙机器配置 ************************** */ + // 最低CPU核心数量,0代表不限 + private double minCpuCores; + // 最低内存空间,单位 GB,0代表不限 + private double minMemorySpace; + // 最低磁盘空间,单位 GB,0代表不限 + private double minDiskSpace; + private Date gmtCreate; private Date gmtModified; diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/UserInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/UserInfoDO.java new file mode 100644 index 00000000..a01f393a --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/model/UserInfoDO.java @@ -0,0 +1,33 @@ +package com.github.kfcfans.oms.server.persistence.model; + +import lombok.Data; + +import javax.persistence.*; +import java.util.Date; + +/** + * 用户信息表 + * + * @author tjq + * @since 2020/4/12 + */ +@Data +@Entity +@Table(name = "user_info") +public class UserInfoDO { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + private String username; + private String password; + + // 手机号 + private String phone; + // 邮箱地址 + private String email; + + private Date gmtCreate; + private Date gmtModified; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java similarity index 66% rename from oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java rename to oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java index 9bdb9d87..4066936e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/ExecuteLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java @@ -1,6 +1,8 @@ package com.github.kfcfans.oms.server.persistence.repository; -import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; @@ -15,14 +17,14 @@ import java.util.List; * @author tjq * @since 2020/4/1 */ -public interface ExecuteLogRepository extends JpaRepository { +public interface InstanceLogRepository extends JpaRepository { /** * 统计当前JOB有多少实例正在运行 */ long countByJobIdAndStatusIn(long jobId, List status); - List findByJobIdAndStatusIn(long jobId, List status); + List findByJobIdAndStatusIn(long jobId, List status); /** @@ -44,9 +46,11 @@ public interface ExecuteLogRepository extends JpaRepository int update4FrequentJob(long instanceId, int status, long runningTimes); // 状态检查三兄弟,对应 WAITING_DISPATCH 、 WAITING_WORKER_RECEIVE 和 RUNNING 三阶段 - List findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(List jobIds, int status, long time); - List findByJobIdInAndStatusAndActualTriggerTimeLessThan(List jobIds, int status, long time); - List findByJobIdInAndStatusAndGmtModifiedBefore(List jobIds, int status, Date time); + List findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(List jobIds, int status, long time); + List findByJobIdInAndStatusAndActualTriggerTimeLessThan(List jobIds, int status, long time); + List findByJobIdInAndStatusAndGmtModifiedBefore(List jobIds, int status, Date time); - ExecuteLogDO findByInstanceId(long instanceId); + InstanceLogDO findByInstanceId(long instanceId); + + Page findByAppId(long appId, Pageable pageable); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java index 225b9503..65c5d5d9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java @@ -1,6 +1,8 @@ package com.github.kfcfans.oms.server.persistence.repository; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import java.util.List; @@ -16,5 +18,5 @@ public interface JobInfoRepository extends JpaRepository { List findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); - List findByAppIdInAndStatusAndTimeExpressionType(List appIds, int status, int timeExpressionType); + Page findByAppId(Long appId, Pageable pageable); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/UserInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/UserInfoRepository.java new file mode 100644 index 00000000..a0257e6e --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/UserInfoRepository.java @@ -0,0 +1,13 @@ +package com.github.kfcfans.oms.server.persistence.repository; + +import com.github.kfcfans.oms.server.persistence.model.UserInfoDO; +import org.springframework.data.jpa.repository.JpaRepository; + +/** + * 用户信息表数据库访问层 + * + * @author tjq + * @since 2020/4/12 + */ +public interface UserInfoRepository extends JpaRepository { +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java index c5b19e34..51376ab5 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/DispatchService.java @@ -5,13 +5,12 @@ import com.github.kfcfans.common.*; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; -import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; -import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; @@ -30,7 +29,7 @@ import static com.github.kfcfans.common.InstanceStatus.*; public class DispatchService { @Resource - private ExecuteLogRepository executeLogRepository; + private InstanceLogRepository instanceLogRepository; private static final String EMPTY_RESULT = ""; @@ -46,31 +45,26 @@ public class DispatchService { log.info("[DispatchService] start to dispatch job: {}.", jobInfo); // 查询当前运行的实例数 long current = System.currentTimeMillis(); - long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus); + long runningInstanceCount = instanceLogRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus); // 超出最大同时运行限制,不执行调度 if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount); - executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, result); + instanceLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, result); return; } // 获取 Worker - String taskTrackerAddress = WorkerManagerService.chooseBestWorker(jobInfo.getAppId()); - List allAvailableWorker = WorkerManagerService.getAllAvailableWorker(jobInfo.getAppId()); + List allAvailableWorker = WorkerManagerService.getSortedAvailableWorker(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); - if (StringUtils.isEmpty(taskTrackerAddress)) { + if (CollectionUtils.isEmpty(allAvailableWorker)) { String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId()); log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription); - executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE); + instanceLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE); return; } - // 消除非原子操作带来的潜在不一致 - allAvailableWorker.remove(taskTrackerAddress); - allAvailableWorker.add(taskTrackerAddress); - // 构造请求 ServerScheduleJobReq req = new ServerScheduleJobReq(); BeanUtils.copyProperties(jobInfo, req); @@ -86,11 +80,12 @@ public class DispatchService { req.setThreadConcurrency(jobInfo.getConcurrency()); // 发送请求(不可靠,需要一个后台线程定期轮询状态) + String taskTrackerAddress = allAvailableWorker.get(0); ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress); taskTrackerActor.tell(req, null); log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); // 修改状态 - executeLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, EMPTY_RESULT); + instanceLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, EMPTY_RESULT); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java index 383338cf..0da5c9e4 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java @@ -50,74 +50,37 @@ public class ClusterStatusHolder { } } - /** - * 选取状态最好的Worker进行任务派发 - * @return Worker的地址(null代表没有可用的Worker) - */ - public String chooseBestWorker() { - - // 直接对 HashMap 根据Value进行排序 - List> entryList = Lists.newArrayList(address2Metrics.entrySet()); - - // 降序排序(Comparator.comparingInt默认为升序,弃用) - entryList.sort((o1, o2) -> o2.getValue().calculateScore() - o1.getValue().calculateScore()); - - for (Map.Entry entry : address2Metrics.entrySet()) { - String address = entry.getKey(); - if (available(address)) { - return address; - } - } - - log.warn("[ClusterStatusHolder] no worker available for {}, worker status is {}.", appName, address2Metrics); - return null; - } - /** * 获取当前所有可用的 Worker + * @param minCPUCores 最低CPU核心数量 + * @param minMemorySpace 最低内存可用空间,单位GB + * @param minDiskSpace 最低磁盘可用空间,单位GB * @return List */ - public List getAllAvailableWorker() { + public List getSortedAvailableWorker(double minCPUCores, double minMemorySpace, double minDiskSpace) { List workers = Lists.newLinkedList(); - address2Metrics.forEach((address, ignore) -> { - if (available(address)) { + address2Metrics.forEach((address, metrics) -> { + + // 排除超时机器 + Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L); + long timeout = System.currentTimeMillis() - lastActiveTime; + if (timeout > WORKER_TIMEOUT_MS) { + return; + } + + // 判断指标 + if (metrics.available(minCPUCores, minMemorySpace, minDiskSpace)) { workers.add(address); } }); + // 按机器健康度排序 + workers.sort((o1, o2) -> address2Metrics.get(o2).calculateScore() - address2Metrics.get(o1).calculateScore()); + return workers; } - /** - * 某台具体的 Worker 是否可用 - * @param address 需要检测的Worker地址 - * @return 可用状态 - */ - private boolean available(String address) { - SystemMetrics metrics = address2Metrics.get(address); - if (metrics.calculateScore() == SystemMetrics.MIN_SCORE) { - return false; - } - - Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L); - long timeout = System.currentTimeMillis() - lastActiveTime; - return timeout < WORKER_TIMEOUT_MS; - } - - /** - * 整个 Worker 集群是否可用(某个App下的所有机器是否可用) - * @return 有一台机器可用 -> true / 全军覆没 -> false - */ - public boolean available() { - for (String address : address2Metrics.keySet()) { - if (available(address)) { - return true; - } - } - return false; - } - /** * 获取整个集群的简介 * @return 获取集群简介 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java index 7e0afa05..cbd3ab13 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java @@ -1,7 +1,6 @@ package com.github.kfcfans.oms.server.service.ha; import com.github.kfcfans.common.request.WorkerHeartbeat; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; @@ -32,29 +31,15 @@ public class WorkerManagerService { } /** - * 选择状态最好的Worker执行任务 - * @param appId 应用ID - * @return Worker的地址(null代表没有可用的Worker) + * 获取有序的当前所有可用的Worker地址(按得分高低排序,排在前面的健康度更高) */ - public static String chooseBestWorker(Long appId) { - ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); - if (clusterStatusHolder == null) { - log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId); - return null; - } - return clusterStatusHolder.chooseBestWorker(); - } - - /** - * 获取当前所有可用的Worker地址 - */ - public static List getAllAvailableWorker(Long appId) { + public static List getSortedAvailableWorker(Long appId, double minCPUCores, double minMemorySpace, double minDiskSpace) { ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); if (clusterStatusHolder == null) { log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId); return Collections.emptyList(); } - return clusterStatusHolder.getAllAvailableWorker(); + return clusterStatusHolder.getSortedAvailableWorker(minCPUCores, minMemorySpace, minDiskSpace); } /** diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index 55aa37f9..8416ce2f 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -4,9 +4,9 @@ import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.oms.server.common.utils.SpringUtils; -import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; -import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.service.DispatchService; import com.google.common.collect.Maps; @@ -33,7 +33,7 @@ public class InstanceManager { // Spring Bean private static DispatchService dispatchService; - private static ExecuteLogRepository executeLogRepository; + private static InstanceLogRepository instanceLogRepository; private static JobInfoRepository jobInfoRepository; /** @@ -90,11 +90,11 @@ public class InstanceManager { // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行) // 综上,直接把 status 和 runningNum 同步到DB即可 if (timeExpressionType != TimeExpressionType.CRON.getV()) { - getExecuteLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum()); + getInstanceLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum()); return; } - ExecuteLogDO updateEntity = getExecuteLogRepository().findByInstanceId(instanceId); + InstanceLogDO updateEntity = getInstanceLogRepository().findByInstanceId(instanceId); updateEntity.setStatus(newStatus.getV()); updateEntity.setGmtModified(new Date()); @@ -121,7 +121,7 @@ public class InstanceManager { } // 同步状态变更信息到数据库 - getExecuteLogRepository().saveAndFlush(updateEntity); + getInstanceLogRepository().saveAndFlush(updateEntity); // 清除已完成的实例信息 if (finished) { @@ -153,15 +153,15 @@ public class InstanceManager { return instanceId2StatusHolder.get(instanceId); } - private static ExecuteLogRepository getExecuteLogRepository() { - while (executeLogRepository == null) { + private static InstanceLogRepository getInstanceLogRepository() { + while (instanceLogRepository == null) { try { Thread.sleep(100); }catch (Exception ignore) { } - executeLogRepository = SpringUtils.getBean(ExecuteLogRepository.class); + instanceLogRepository = SpringUtils.getBean(InstanceLogRepository.class); } - return executeLogRepository; + return instanceLogRepository; } private static JobInfoRepository getJobInfoRepository() { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java index 172adf75..a4d0b276 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java @@ -1,20 +1,27 @@ package com.github.kfcfans.oms.server.service.instance; import akka.actor.ActorSelection; +import akka.pattern.Patterns; import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.SystemInstanceResult; -import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.common.model.InstanceDetail; +import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.common.request.ServerStopInstanceReq; +import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.oms.server.akka.OhMyServer; -import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; -import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; -import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.data.domain.*; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.time.Duration; import java.util.Date; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import static com.github.kfcfans.common.InstanceStatus.RUNNING; import static com.github.kfcfans.common.InstanceStatus.STOPPED; @@ -30,9 +37,7 @@ import static com.github.kfcfans.common.InstanceStatus.STOPPED; public class InstanceService { @Resource - private AppInfoRepository appInfoRepository; - @Resource - private ExecuteLogRepository executeLogRepository; + private InstanceLogRepository instanceLogRepository; /** * 停止任务实例 @@ -40,48 +45,84 @@ public class InstanceService { */ public void stopInstance(Long instanceId) { - ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); - if (executeLogDO == null) { + InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId); + if (instanceLogDO == null) { log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId); throw new IllegalArgumentException("invalid instanceId: " + instanceId); } // 更新数据库,将状态置为停止 - executeLogDO.setStatus(STOPPED.getV()); - executeLogDO.setGmtModified(new Date()); - executeLogDO.setFinishedTime(System.currentTimeMillis()); - executeLogDO.setResult(SystemInstanceResult.STOPPED_BY_USER); - executeLogRepository.saveAndFlush(executeLogDO); + instanceLogDO.setStatus(STOPPED.getV()); + instanceLogDO.setGmtModified(new Date()); + instanceLogDO.setFinishedTime(System.currentTimeMillis()); + instanceLogDO.setResult(SystemInstanceResult.STOPPED_BY_USER); + instanceLogRepository.saveAndFlush(instanceLogDO); // 停止 TaskTracker - ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(executeLogDO.getTaskTrackerAddress()); + ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceLogDO.getTaskTrackerAddress()); ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId); taskTrackerActor.tell(req, null); } + /** + * 获取任务实例的详细运行详细 + * @param instanceId 任务实例ID + * @return 详细运行状态 + */ public InstanceDetail getInstanceDetail(Long instanceId) { - ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId); - if (executeLogDO == null) { + InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId); + if (instanceLogDO == null) { log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId); throw new IllegalArgumentException("invalid instanceId: " + instanceId); } - InstanceStatus instanceStatus = InstanceStatus.of(executeLogDO.getStatus()); + InstanceStatus instanceStatus = InstanceStatus.of(instanceLogDO.getStatus()); InstanceDetail detail = new InstanceDetail(); detail.setStatus(instanceStatus.getDes()); // 只要不是运行状态,只需要返回简要信息 if (instanceStatus != RUNNING) { - BeanUtils.copyProperties(executeLogDO, detail); + BeanUtils.copyProperties(instanceLogDO, detail); return detail; } - // 运行状态下,需要分别考虑MapReduce、Broadcast和秒级任务的详细信息 + // 运行状态下,交由 TaskTracker 返回相关信息 + try { + ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId); + ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceLogDO.getTaskTrackerAddress()); + CompletionStage askCS = Patterns.ask(taskTrackerActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (askResponse.isSuccess()) { + return (InstanceDetail) askResponse.getExtra(); + }else { + log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getExtra()); + } + }catch (Exception e) { + log.error("[InstanceService] ask InstanceStatus from TaskTracker failed.", e); + } - return null; + // 失败则返回基础版信息 + BeanUtils.copyProperties(instanceLogDO, detail); + return detail; + } + + /** + * 获取任务实例列表 + * @param appId 应用ID + * @param page 页码 + * @param size 页大小 + * @return 分页对象 + */ + public Page listInstance(long appId, int page, int size) { + + // 按预计触发时间排序 + Sort sort = Sort.by(Sort.Direction.DESC, "expectedTriggerTime"); + PageRequest pageRequest = PageRequest.of(page, size, sort); + + return instanceLogRepository.findByAppId(appId, pageRequest); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java index baafa316..c93ddf8b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/InstanceStatusCheckService.java @@ -6,10 +6,10 @@ import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; -import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; -import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.service.DispatchService; import com.google.common.base.Stopwatch; @@ -44,7 +44,7 @@ public class InstanceStatusCheckService { @Resource private AppInfoRepository appInfoRepository; @Resource - private ExecuteLogRepository executeLogRepository; + private InstanceLogRepository instanceLogRepository; @Resource private JobInfoRepository jobInfoRepository; @@ -73,7 +73,7 @@ public class InstanceStatusCheckService { // 1. 检查等待 WAITING_DISPATCH 状态的任务 long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS; - List waitingDispatchInstances = executeLogRepository.findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold); + List waitingDispatchInstances = instanceLogRepository.findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold); if (!CollectionUtils.isEmpty(waitingDispatchInstances)) { log.warn("[InstanceStatusCheckService] instances({}) is not triggered as expected.", waitingDispatchInstances); waitingDispatchInstances.forEach(instance -> { @@ -85,7 +85,7 @@ public class InstanceStatusCheckService { // 2. 检查 WAITING_WORKER_RECEIVE 状态的任务 threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; - List waitingWorkerReceiveInstances = executeLogRepository.findByJobIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold); + List waitingWorkerReceiveInstances = instanceLogRepository.findByJobIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold); if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) { log.warn("[InstanceStatusCheckService] instances({}) did n’t receive any reply from worker.", waitingWorkerReceiveInstances); waitingWorkerReceiveInstances.forEach(instance -> { @@ -97,7 +97,7 @@ public class InstanceStatusCheckService { // 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败) threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS; - List failedInstances = executeLogRepository.findByJobIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold)); + List failedInstances = instanceLogRepository.findByJobIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold)); if (!CollectionUtils.isEmpty(failedInstances)) { log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances); failedInstances.forEach(instance -> { @@ -133,11 +133,11 @@ public class InstanceStatusCheckService { /** * 处理上报超时而失败的任务实例 */ - private void updateFailedInstance(ExecuteLogDO instance) { + private void updateFailedInstance(InstanceLogDO instance) { instance.setStatus(InstanceStatus.FAILED.getV()); instance.setFinishedTime(System.currentTimeMillis()); instance.setGmtModified(new Date()); instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); - executeLogRepository.saveAndFlush(instance); + instanceLogRepository.saveAndFlush(instance); } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java index 38568347..3704764b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java @@ -7,11 +7,11 @@ import com.github.kfcfans.oms.server.common.utils.CronExpression; import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; -import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; -import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.service.DispatchService; import com.github.kfcfans.oms.server.service.IdGenerateService; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; @@ -52,7 +52,7 @@ public class JobScheduleService { @Resource private JobInfoRepository jobInfoRepository; @Resource - private ExecuteLogRepository executeLogRepository; + private InstanceLogRepository instanceLogRepository; private static final long SCHEDULE_RATE = 5000; @@ -103,10 +103,10 @@ public class JobScheduleService { Map jobId2InstanceId = Maps.newHashMap(); log.info("[JobScheduleService] These cron jobs will be scheduled: {}.", jobInfos); - List executeLogs = Lists.newLinkedList(); + List executeLogs = Lists.newLinkedList(); jobInfos.forEach(jobInfoDO -> { - ExecuteLogDO executeLog = new ExecuteLogDO(); + InstanceLogDO executeLog = new InstanceLogDO(); executeLog.setJobId(jobInfoDO.getId()); executeLog.setAppId(jobInfoDO.getAppId()); executeLog.setInstanceId(IdGenerateService.allocate()); @@ -119,8 +119,8 @@ public class JobScheduleService { jobId2InstanceId.put(executeLog.getJobId(), executeLog.getInstanceId()); }); - executeLogRepository.saveAll(executeLogs); - executeLogRepository.flush(); + instanceLogRepository.saveAll(executeLogs); + instanceLogRepository.flush(); // 2. 推入时间轮中等待调度执行 jobInfos.forEach(jobInfoDO -> { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index 72af4a89..489d6ce9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -1,13 +1,22 @@ package com.github.kfcfans.oms.server.web.controller; +import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.response.ResultDTO; -import com.github.kfcfans.oms.server.service.instance.InstanceDetail; +import com.github.kfcfans.common.model.InstanceDetail; +import com.github.kfcfans.oms.server.persistence.PageResult; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; import com.github.kfcfans.oms.server.service.instance.InstanceService; +import com.github.kfcfans.oms.server.web.response.InstanceLogVO; +import org.springframework.beans.BeanUtils; +import org.springframework.data.domain.Page; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** @@ -33,4 +42,20 @@ public class InstanceController { public ResultDTO getRunningStatus(Long instanceId) { return ResultDTO.success(instanceService.getInstanceDetail(instanceId)); } + + @GetMapping("/list") + public ResultDTO> list(Long appId, int index, int pageSize) { + + Page page = instanceService.listInstance(appId, index, pageSize); + List content = page.getContent().stream().map(instanceLogDO -> { + InstanceLogVO instanceLogVO = new InstanceLogVO(); + BeanUtils.copyProperties(instanceLogDO, instanceLogVO); + instanceLogVO.setStatus(InstanceStatus.of(instanceLogDO.getStatus()).getDes()); + return instanceLogVO; + }).collect(Collectors.toList()); + + PageResult pageResult = new PageResult<>(page); + pageResult.setData(content); + return ResultDTO.success(pageResult); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index f8563013..269f6a6c 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -6,8 +6,9 @@ import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.utils.CronExpression; -import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; -import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.PageResult; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; @@ -15,8 +16,12 @@ import com.github.kfcfans.oms.server.service.DispatchService; import com.github.kfcfans.oms.server.service.IdGenerateService; import com.github.kfcfans.oms.server.service.instance.InstanceService; import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest; +import com.github.kfcfans.oms.server.web.response.JobInfoVO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; @@ -27,6 +32,7 @@ import javax.annotation.Resource; import java.util.Date; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; /** * 任务信息管理 Controller @@ -47,7 +53,7 @@ public class JobController { @Resource private JobInfoRepository jobInfoRepository; @Resource - private ExecuteLogRepository executeLogRepository; + private InstanceLogRepository instanceLogRepository; @PostMapping("/save") public ResultDTO saveJobInfo(ModifyJobInfoRequest request) throws Exception { @@ -104,11 +110,28 @@ public class JobController { return ResultDTO.success(null); } + @GetMapping("/list") + public ResultDTO> listJobs(Long appId, int index, int pageSize) { + + Sort sort = Sort.by(Sort.Direction.DESC, "gmtCreate"); + PageRequest pageRequest = PageRequest.of(index, pageSize, sort); + Page jobInfoPage = jobInfoRepository.findByAppId(appId, pageRequest); + List jobInfoVOList = jobInfoPage.getContent().stream().map(jobInfoDO -> { + JobInfoVO jobInfoVO = new JobInfoVO(); + BeanUtils.copyProperties(jobInfoDO, jobInfoVO); + return jobInfoVO; + }).collect(Collectors.toList()); + + PageResult pageResult = new PageResult<>(jobInfoPage); + pageResult.setData(jobInfoVOList); + return ResultDTO.success(pageResult); + } + /** * 立即运行JOB */ private void runJobImmediately(JobInfoDO jobInfoDO) { - ExecuteLogDO executeLog = new ExecuteLogDO(); + InstanceLogDO executeLog = new InstanceLogDO(); executeLog.setJobId(jobInfoDO.getId()); executeLog.setAppId(jobInfoDO.getAppId()); executeLog.setInstanceId(IdGenerateService.allocate()); @@ -117,7 +140,7 @@ public class JobController { executeLog.setGmtCreate(new Date()); executeLog.setGmtModified(executeLog.getGmtCreate()); - executeLogRepository.saveAndFlush(executeLog); + instanceLogRepository.saveAndFlush(executeLog); dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0); } @@ -141,7 +164,7 @@ public class JobController { if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) { return; } - List executeLogs = executeLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus); + List executeLogs = instanceLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus); if (CollectionUtils.isEmpty(executeLogs)) { return; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/UserInfoController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/UserInfoController.java new file mode 100644 index 00000000..8c6a0d8c --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/UserInfoController.java @@ -0,0 +1,58 @@ +package com.github.kfcfans.oms.server.web.controller; + +import com.github.kfcfans.common.response.ResultDTO; +import com.github.kfcfans.oms.server.persistence.model.UserInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.UserInfoRepository; +import com.github.kfcfans.oms.server.web.request.ModifyUserInfoRequest; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.springframework.beans.BeanUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 用户信息控制层 + * + * @author tjq + * @since 2020/4/12 + */ +@RestController +@RequestMapping("/user") +public class UserInfoController { + + @Resource + private UserInfoRepository userInfoRepository; + + @PostMapping("save") + public ResultDTO save(ModifyUserInfoRequest request) { + UserInfoDO userInfoDO = new UserInfoDO(); + BeanUtils.copyProperties(request, userInfoDO); + userInfoDO.setGmtCreate(new Date()); + userInfoDO.setGmtModified(userInfoDO.getGmtCreate()); + userInfoRepository.saveAndFlush(userInfoDO); + return ResultDTO.success(null); + } + + @GetMapping("list") + public ResultDTO> list() { + List result = userInfoRepository.findAll().stream().map(x -> new UserItemVO(x.getId(), x.getUsername())).collect(Collectors.toList()); + return ResultDTO.success(result); + } + + + @Getter + @NoArgsConstructor + @AllArgsConstructor + public static final class UserItemVO { + private Long id; + private String username; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java index 3ce664b4..3a475200 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyJobInfoRequest.java @@ -56,6 +56,14 @@ public class ModifyJobInfoRequest { private Integer instanceRetryNum; private Integer taskRetryNum; + /* ************************** 繁忙机器配置 ************************** */ + // 最低CPU核心数量,0代表不限 + private double minCpuCores; + // 最低内存空间,单位 GB,0代表不限 + private double minMemorySpace; + // 最低磁盘空间,单位 GB,0代表不限 + private double minDiskSpace; + // 1 正常运行,2 停止(不再调度) private Integer status; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyUserInfoRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyUserInfoRequest.java new file mode 100644 index 00000000..e395e7cb --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/ModifyUserInfoRequest.java @@ -0,0 +1,23 @@ +package com.github.kfcfans.oms.server.web.request; + +import lombok.Data; + +/** + * 创建/修改 UserInfo 请求 + * + * @author tjq + * @since 2020/4/12 + */ +@Data +public class ModifyUserInfoRequest { + + private Long id; + + private String username; + private String password; + + // 手机号 + private String phone; + // 邮箱地址 + private String email; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java new file mode 100644 index 00000000..9881ac9e --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java @@ -0,0 +1,42 @@ +package com.github.kfcfans.oms.server.web.response; + +import lombok.Data; + +import java.util.Date; + +/** + * ExecuteLog 对外展示对象 + * + * @author tjq + * @since 2020/4/12 + */ +@Data +public class InstanceLogVO { + + // 任务ID + private Long jobId; + // 任务所属应用的ID,冗余提高查询效率 + private Long appId; + // 任务实例ID + private Long instanceId; + + // 执行结果 + private String result; + // 预计触发时间 + private Long expectedTriggerTime; + // 实际触发时间 + private Long actualTriggerTime; + // 结束时间 + private Long finishedTime; + // TaskTracker地址 + private String taskTrackerAddress; + + // 总共执行的次数(用于重试判断) + private Long runningTimes; + + private Date gmtCreate; + private Date gmtModified; + + /* ********** 不一致区域 ********** */ + private String status; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/JobInfoVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/JobInfoVO.java new file mode 100644 index 00000000..1b12f8f7 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/JobInfoVO.java @@ -0,0 +1,71 @@ +package com.github.kfcfans.oms.server.web.response; + +import lombok.Data; + +import java.util.Date; + +/** + * JobInfo 对外展示对象 + * + * @author tjq + * @since 2020/4/12 + */ +@Data +public class JobInfoVO { + + private Long id; + + /* ************************** 任务基本信息 ************************** */ + // 任务名称 + private String jobName; + // 任务描述 + private String jobDescription; + // 任务所属的应用ID + private Long appId; + // 任务自带的参数 + private String jobParams; + // 任务实例的参数(API触发专用) + private String instanceParams; + + /* ************************** 定时参数 ************************** */ + // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + private Integer timeExpressionType; + // 时间表达式,CRON/NULL/LONG/LONG + private String timeExpression; + + /* ************************** 执行方式 ************************** */ + // 执行类型,单机/广播/MR + private Integer executeType; + // 执行器类型,Java/Shell + private Integer processorType; + // 执行器信息 + private String processorInfo; + + /* ************************** 运行时配置 ************************** */ + // 最大同时运行任务数,默认 1 + private Integer maxInstanceNum; + // 并发度,同时执行某个任务的最大线程数量 + private Integer concurrency; + // 任务整体超时时间 + private Long instanceTimeLimit; + + /* ************************** 重试配置 ************************** */ + private Integer instanceRetryNum; + private Integer taskRetryNum; + + // 1 正常运行,2 停止(不再调度) + private Integer status; + // 下一次调度时间 + private Long nextTriggerTime; + + /* ************************** 繁忙机器配置 ************************** */ + // 最低CPU核心数量,0代表不限 + private double minCpuCores; + // 最低内存空间,单位 GB,0代表不限 + private double minMemorySpace; + // 最低磁盘空间,单位 GB,0代表不限 + private double minDiskSpace; + + private Date gmtCreate; + private Date gmtModified; +} diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java index c779dc64..5d4f886b 100644 --- a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/RepositoryTest.java @@ -3,10 +3,10 @@ package com.github.kfcfans.oms.server.test; import com.github.kfcfans.common.utils.NetUtils; import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.common.TimeExpressionType; -import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO; +import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.model.OmsLockDO; -import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository; import org.assertj.core.util.Lists; @@ -33,7 +33,7 @@ public class RepositoryTest { @Resource private OmsLockRepository omsLockRepository; @Resource - private ExecuteLogRepository executeLogRepository; + private InstanceLogRepository instanceLogRepository; /** * 需要证明批量写入失败后会回滚 @@ -58,17 +58,17 @@ public class RepositoryTest { @Test public void testUpdate() { - ExecuteLogDO updateEntity = new ExecuteLogDO(); + InstanceLogDO updateEntity = new InstanceLogDO(); updateEntity.setId(22L); updateEntity.setActualTriggerTime(System.currentTimeMillis()); updateEntity.setResult("hahaha"); - executeLogRepository.saveAndFlush(updateEntity); + instanceLogRepository.saveAndFlush(updateEntity); } @Test public void testExecuteLogUpdate() { - executeLogRepository.update4Trigger(1586310414570L, 2, 100, System.currentTimeMillis(), "192.168.1.1", "NULL"); - executeLogRepository.update4FrequentJob(1586310419650L, 2, 200); + instanceLogRepository.update4Trigger(1586310414570L, 2, 100, System.currentTimeMillis(), "192.168.1.1", "NULL"); + instanceLogRepository.update4FrequentJob(1586310419650L, 2, 200); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java index 68a349ca..721ba4d2 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerActor.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.worker.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.common.model.InstanceDetail; import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.ServerStopInstanceReq; @@ -150,10 +151,17 @@ public class TaskTrackerActor extends AbstractActor { * 查询任务实例运行状态 */ private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) { + AskResponse askResponse = new AskResponse(); TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req); - return; + askResponse.setSuccess(false); + askResponse.setExtra("can't find TaskTracker"); + }else { + InstanceDetail instanceDetail = taskTracker.fetchRunningStatus(); + askResponse.setSuccess(true); + askResponse.setExtra(instanceDetail); } + getSender().tell(askResponse, getSelf()); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java index 5d4b0dd5..384f2d2f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java @@ -5,6 +5,7 @@ import akka.pattern.Patterns; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.SystemInstanceResult; +import com.github.kfcfans.common.model.InstanceDetail; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.response.AskResponse; @@ -57,6 +58,26 @@ public class CommonTaskTracker extends TaskTracker { scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); } + @Override + public InstanceDetail fetchRunningStatus() { + + InstanceDetail detail = new InstanceDetail(); + // 填充基础信息 + detail.setActualTriggerTime(createTime); + detail.setStatus(InstanceStatus.RUNNING.getDes()); + detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress()); + + // 填充详细信息 + InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); + InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail(); + taskDetail.setSucceedTaskNum(holder.succeedNum); + taskDetail.setFailedTaskNum(holder.failedNum); + taskDetail.setTotalTaskNum(holder.getTotalTaskNum()); + detail.setExtra(taskDetail); + + return detail; + } + /** * 任务是否超时 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index 3403da93..376e036f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -5,6 +5,7 @@ import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.common.model.InstanceDetail; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.oms.worker.OhMyWorker; @@ -17,10 +18,13 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executors; @@ -98,6 +102,27 @@ public class FrequentTaskTracker extends TaskTracker { } + @Override + public InstanceDetail fetchRunningStatus() { + InstanceDetail detail = new InstanceDetail(); + // 填充基础信息 + detail.setActualTriggerTime(createTime); + detail.setStatus(InstanceStatus.RUNNING.getDes()); + detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress()); + + List history = Lists.newLinkedList(); + recentSubInstanceInfo.forEach((ignore, subInstanceInfo) -> { + InstanceDetail.SubInstanceDetail subDetail = new InstanceDetail.SubInstanceDetail(); + BeanUtils.copyProperties(subInstanceInfo, subDetail); + subDetail.setStatus(InstanceStatus.of(subInstanceInfo.status).getDes()); + + history.add(subDetail); + }); + + detail.setExtra(history); + return detail; + } + /** * 任务发射器(@Reference 饥荒->雪球发射器) */ @@ -300,6 +325,7 @@ public class FrequentTaskTracker extends TaskTracker { SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId); subInstanceInfo.status = success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV(); subInstanceInfo.result = result; + subInstanceInfo.finishedTime = System.currentTimeMillis(); } // 删除数据库相关数据 taskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId); @@ -310,9 +336,11 @@ public class FrequentTaskTracker extends TaskTracker { } } + @Data private static class SubInstanceInfo { private int status; private long startTime; + private long finishedTime; private String result; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 77c56cfa..dfed09dc 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.worker.core.tracker.task; import akka.actor.ActorSelection; import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.TimeExpressionType; +import com.github.kfcfans.common.model.InstanceDetail; import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.OhMyWorker; @@ -371,6 +372,10 @@ public abstract class TaskTracker { protected long runningNum; protected long failedNum; protected long succeedNum; + + public long getTotalTaskNum() { + return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum; + } } /** @@ -378,4 +383,10 @@ public abstract class TaskTracker { * @param req 服务器调度任务实例运行请求 */ abstract protected void initTaskTracker(ServerScheduleJobReq req); + + /** + * 查询任务实例的详细运行状态 + * @return 任务实例的详细运行状态 + */ + abstract public InstanceDetail fetchRunningStatus(); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java index 50f95549..f6041332 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorTrackerStatusReportReq.java @@ -5,6 +5,8 @@ import com.github.kfcfans.oms.worker.OhMyWorker; import lombok.Data; import lombok.NoArgsConstructor; +import java.io.Serializable; + /** * ProcessorTracker 定时向 TaskTracker 上报健康状态 * @@ -13,7 +15,7 @@ import lombok.NoArgsConstructor; */ @Data @NoArgsConstructor -public class ProcessorTrackerStatusReportReq { +public class ProcessorTrackerStatusReportReq implements Serializable { private Long instanceId;