diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java index 6ef04e1f..da1837dd 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/InstanceStatus.java @@ -16,12 +16,12 @@ import java.util.List; @AllArgsConstructor public enum InstanceStatus { - WAITING_DISPATCH(1, "等待任务派发"), + WAITING_DISPATCH(1, "等待派发"), WAITING_WORKER_RECEIVE(2, "等待Worker接收"), RUNNING(3, "运行中"), - FAILED(4, "任务运行失败"), - SUCCEED(5, "任务运行成功"), - STOPPED(10, "任务被手动停止"); + FAILED(4, "失败"), + SUCCEED(5, "成功"), + STOPPED(10, "手动停止"); private int v; private String des; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java index f246f5fb..6a9438ec 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/RemoteConstant.java @@ -32,5 +32,5 @@ public class RemoteConstant { /* ************************ OTHERS ************************ */ public static final String EMPTY_ADDRESS = "N/A"; - public static final long DEFAULT_TIMEOUT_MS = 5000; + public static final long DEFAULT_TIMEOUT_MS = 3000; } diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java index 07543192..ed3e4b61 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceDetail.java @@ -14,9 +14,9 @@ import java.io.Serializable; public class InstanceDetail implements Serializable { // 任务整体开始时间 - private long actualTriggerTime; + private Long actualTriggerTime; // 任务整体结束时间(可能不存在) - private long finishedTime; + private Long finishedTime; // 任务状态(中文) private String status; // 任务执行结果(可能不存在) diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java index 42808dc8..761c53bc 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/SystemMetrics.java @@ -15,17 +15,19 @@ public class SystemMetrics implements Serializable, Comparable { // CPU核心数量 private int cpuProcessors; - // CPU负载 + // CPU负载(需要处以核心数) private double cpuLoad; // 内存(单位 GB) private double jvmUsedMemory; - private double jvmTotalMemory; private double jvmMaxMemory; + // 内存占用(0.X,非百分比) + private double jvmMemoryUsage; // 磁盘(单位 GB) private double diskUsed; private double diskTotal; + // 磁盘占用(0.X,非百分比) private double diskUsage; // 缓存分数 diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java index 88e5b7f4..8eb92fdb 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java @@ -19,6 +19,10 @@ public class ServerScheduleJobReq implements Serializable { /* *********************** 任务相关属性 *********************** */ + /** + * 任务ID,当更换Server后需要根据 JobId 重新查询任务元数据 + */ + private Long jobId; /** * 基础信息 */ diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java index 620e6767..1667fea9 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/FriendActor.java @@ -1,10 +1,15 @@ package com.github.kfcfans.oms.server.akka.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.common.model.SystemMetrics; import com.github.kfcfans.common.response.AskResponse; +import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.oms.server.akka.requests.Ping; +import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; import lombok.extern.slf4j.Slf4j; +import java.util.Map; + /** * 处理朋友们的信息(处理服务器与服务器之间的通讯) * @@ -17,6 +22,7 @@ public class FriendActor extends AbstractActor { public Receive createReceive() { return receiveBuilder() .match(Ping.class, this::onReceivePing) + .match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq) .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj)) .build(); } @@ -30,4 +36,15 @@ public class FriendActor extends AbstractActor { askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime()); getSender().tell(askResponse, getSelf()); } + + /** + * 处理查询Worker节点的请求 + */ + private void onReceiveFriendQueryWorkerClusterStatusReq(FriendQueryWorkerClusterStatusReq req) { + Map workerInfo = WorkerManagerService.getActiveWorkerInfo(req.getAppId()); + AskResponse askResponse = new AskResponse(); + askResponse.setSuccess(true); + askResponse.setExtra(workerInfo); + getSender().tell(askResponse, getSelf()); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/FriendQueryWorkerClusterStatusReq.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/FriendQueryWorkerClusterStatusReq.java new file mode 100644 index 00000000..2ac25c5a --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/requests/FriendQueryWorkerClusterStatusReq.java @@ -0,0 +1,20 @@ +package com.github.kfcfans.oms.server.akka.requests; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 查询 Worker 集群状态 + * + * @author tjq + * @since 2020/4/14 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class FriendQueryWorkerClusterStatusReq implements Serializable { + private Long appId; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java index e26108ff..51fadc92 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java @@ -53,4 +53,7 @@ public interface InstanceLogRepository extends JpaRepository findByAppId(long appId, Pageable pageable); + Page findByJobId(long jobId, Pageable pageable); + // 只会有一条数据,只是为了统一 + Page findByInstanceId(long instanceId, Pageable pageable); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java index 7a09efa7..6e21cad4 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/JobInfoRepository.java @@ -5,6 +5,7 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import java.util.Date; import java.util.List; /** @@ -21,4 +22,9 @@ public interface JobInfoRepository extends JpaRepository { Page findByAppIdAndStatusNot(Long appId, Pageable pageable, int status); Page findByAppIdAndJobNameLikeAndStatusNot(Long appId, String condition, int status, Pageable pageable); + + + long countByAppId(long appId); + long countByAppIdAndStatus(long appId, int status); + long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java new file mode 100644 index 00000000..c4de915a --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/CacheService.java @@ -0,0 +1,51 @@ +package com.github.kfcfans.oms.server.service; + +import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.time.Duration; +import java.util.Optional; + +/** + * 本地缓存常用数据查询操作 + * + * @author tjq + * @since 2020/4/14 + */ +@Slf4j +@Service +public class CacheService { + + @Resource + private JobInfoRepository jobInfoRepository; + + private final Cache jobId2JobNameCache; + + public CacheService() { + jobId2JobNameCache = CacheBuilder.newBuilder() + .expireAfterWrite(Duration.ofHours(1)) + .maximumSize(1024) + .build(); + } + + /** + * 根据 jobId 查询 jobName(不保证数据一致性,或者说只要改了数据必不一致hhh) + */ + public String getJobName(Long jobId) { + try { + return jobId2JobNameCache.get(jobId, () -> { + Optional jobInfoDOOptional = jobInfoRepository.findById(jobId); + // 防止缓存穿透 hhh + return jobInfoDOOptional.map(JobInfoDO::getJobName).orElse(""); + }); + }catch (Exception e) { + log.error("[CacheService] getJobName for {} failed.", jobId, e); + } + return null; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java index 0da5c9e4..53272d3e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java @@ -62,13 +62,9 @@ public class ClusterStatusHolder { address2Metrics.forEach((address, metrics) -> { - // 排除超时机器 - Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L); - long timeout = System.currentTimeMillis() - lastActiveTime; - if (timeout > WORKER_TIMEOUT_MS) { + if (timeout(address)) { return; } - // 判断指标 if (metrics.available(minCPUCores, minMemorySpace, minDiskSpace)) { workers.add(address); @@ -88,4 +84,25 @@ public class ClusterStatusHolder { public String getClusterDescription() { return String.format("appName:%s,clusterStatus:%s", appName, address2Metrics.toString()); } + + /** + * 获取当前连接的的机器详情 + * @return map + */ + public Map getActiveWorkerInfo() { + Map res = Maps.newHashMap(); + address2Metrics.forEach((address, metrics) -> { + if (!timeout(address)) { + res.put(address, metrics); + } + }); + return res; + } + + private boolean timeout(String address) { + // 排除超时机器 + Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L); + long timeout = System.currentTimeMillis() - lastActiveTime; + return timeout > WORKER_TIMEOUT_MS; + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java index cbd3ab13..524c2864 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/WorkerManagerService.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.server.service.ha; +import com.github.kfcfans.common.model.SystemMetrics; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -36,7 +37,7 @@ public class WorkerManagerService { public static List getSortedAvailableWorker(Long appId, double minCPUCores, double minMemorySpace, double minDiskSpace) { ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); if (clusterStatusHolder == null) { - log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId); + log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); return Collections.emptyList(); } return clusterStatusHolder.getSortedAvailableWorker(minCPUCores, minMemorySpace, minDiskSpace); @@ -64,5 +65,17 @@ public class WorkerManagerService { return clusterStatusHolder.getClusterDescription(); } + /** + * 获取当前连接到该Server的Worker信息 + * @param appId 应用ID + * @return Worker信息 + */ + public static Map getActiveWorkerInfo(Long appId) { + ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); + if (clusterStatusHolder == null) { + return Collections.emptyMap(); + } + return clusterStatusHolder.getActiveWorkerInfo(); + } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java index a4d0b276..7a042e95 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceService.java @@ -50,6 +50,12 @@ public class InstanceService { log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId); throw new IllegalArgumentException("invalid instanceId: " + instanceId); } + + // 判断状态,只有运行中才能停止 + if (!InstanceStatus.generalizedRunningStatus.contains(instanceLogDO.getStatus())) { + throw new IllegalArgumentException("can't stop finished instance!"); + } + // 更新数据库,将状态置为停止 instanceLogDO.setStatus(STOPPED.getV()); instanceLogDO.setGmtModified(new Date()); @@ -109,20 +115,4 @@ public class InstanceService { return detail; } - /** - * 获取任务实例列表 - * @param appId 应用ID - * @param page 页码 - * @param size 页大小 - * @return 分页对象 - */ - public Page listInstance(long appId, int page, int size) { - - // 按预计触发时间排序 - Sort sort = Sort.by(Sort.Direction.DESC, "expectedTriggerTime"); - PageRequest pageRequest = PageRequest.of(page, size, sort); - - return instanceLogRepository.findByAppId(appId, pageRequest); - } - } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java index 3704764b..75d89ef0 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java @@ -149,7 +149,9 @@ public class JobScheduleService { try { CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); - Date nextTriggerTime = cronExpression.getNextValidTimeAfter(now); + + Date benchmarkTime = new Date(jobInfoDO.getNextTriggerTime()); + Date nextTriggerTime = cronExpression.getNextValidTimeAfter(benchmarkTime); JobInfoDO updatedJobInfo = new JobInfoDO(); BeanUtils.copyProperties(jobInfoDO, updatedJobInfo); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java index 489d6ce9..d5cfd5e6 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/InstanceController.java @@ -5,13 +5,19 @@ import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.common.model.InstanceDetail; import com.github.kfcfans.oms.server.persistence.PageResult; import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; +import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository; +import com.github.kfcfans.oms.server.service.CacheService; import com.github.kfcfans.oms.server.service.instance.InstanceService; +import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest; import com.github.kfcfans.oms.server.web.response.InstanceLogVO; +import com.github.kfcfans.oms.server.web.response.JobInfoVO; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Page; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; +import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.List; @@ -31,6 +37,12 @@ public class InstanceController { @Resource private InstanceService instanceService; + @Resource + private CacheService cacheService; + @Resource + private InstanceLogRepository instanceLogRepository; + + private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; @GetMapping("/stop") public ResultDTO stopInstance(Long instanceId) { @@ -43,19 +55,49 @@ public class InstanceController { return ResultDTO.success(instanceService.getInstanceDetail(instanceId)); } - @GetMapping("/list") - public ResultDTO> list(Long appId, int index, int pageSize) { + @PostMapping("/list") + public ResultDTO> list(@RequestBody QueryInstanceRequest request) { - Page page = instanceService.listInstance(appId, index, pageSize); + Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified"); + PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort); + + // 查询全部数据 + if (request.getJobId() == null && request.getInstanceId() == null) { + return ResultDTO.success(convertPage(instanceLogRepository.findByAppId(request.getAppId(), pageable))); + } + + // 根据JobId查询 + if (request.getJobId() != null) { + return ResultDTO.success(convertPage(instanceLogRepository.findByJobId(request.getJobId(), pageable))); + } + + // 根据InstanceId查询 + return ResultDTO.success(convertPage(instanceLogRepository.findByInstanceId(request.getInstanceId(), pageable))); + } + + private PageResult convertPage(Page page) { List content = page.getContent().stream().map(instanceLogDO -> { InstanceLogVO instanceLogVO = new InstanceLogVO(); BeanUtils.copyProperties(instanceLogDO, instanceLogVO); + + // 状态转化为中文 instanceLogVO.setStatus(InstanceStatus.of(instanceLogDO.getStatus()).getDes()); + // 额外设置任务名称,提高可读性 + instanceLogVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId())); + + // 格式化时间 + instanceLogVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), TIME_PATTERN)); + if (instanceLogDO.getFinishedTime() == null) { + instanceLogVO.setFinishedTime("N/A"); + }else { + instanceLogVO.setFinishedTime(DateFormatUtils.format(instanceLogDO.getFinishedTime(), TIME_PATTERN)); + } + return instanceLogVO; }).collect(Collectors.toList()); PageResult pageResult = new PageResult<>(page); pageResult.setData(content); - return ResultDTO.success(pageResult); + return pageResult; } } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java new file mode 100644 index 00000000..0e933486 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/SystemInfoController.java @@ -0,0 +1,97 @@ +package com.github.kfcfans.oms.server.web.controller; + +import akka.actor.ActorSelection; +import akka.pattern.Patterns; +import com.github.kfcfans.common.InstanceStatus; +import com.github.kfcfans.common.RemoteConstant; +import com.github.kfcfans.common.model.SystemMetrics; +import com.github.kfcfans.common.response.AskResponse; +import com.github.kfcfans.common.response.ResultDTO; +import com.github.kfcfans.oms.server.akka.OhMyServer; +import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq; +import com.github.kfcfans.oms.server.persistence.model.AppInfoDO; +import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository; +import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository; +import com.github.kfcfans.oms.server.web.response.SystemOverviewVO; +import com.github.kfcfans.oms.server.web.response.WorkerStatusVO; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.time.DateUtils; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +/** + * 系统信息控制器(服务于前端首页) + * + * @author tjq + * @since 2020/4/14 + */ +@RestController +@RequestMapping("/system") +public class SystemInfoController { + + @Resource + private AppInfoRepository appInfoRepository; + @Resource + private JobInfoRepository jobInfoRepository; + + @GetMapping("/listWorker") + @SuppressWarnings("unchecked") + public ResultDTO> listWorker(Long appId) { + Optional appInfoOpt = appInfoRepository.findById(appId); + if (!appInfoOpt.isPresent()) { + return ResultDTO.failed("unknown appId of " +appId); + } + String server =appInfoOpt.get().getCurrentServer(); + + // 没有Server + if (StringUtils.isEmpty(server)) { + return ResultDTO.success(Collections.emptyList()); + } + + // 重定向到指定 Server 获取集群信息 + FriendQueryWorkerClusterStatusReq req = new FriendQueryWorkerClusterStatusReq(appId); + try { + ActorSelection friendActor = OhMyServer.getFriendActor(server); + CompletionStage askCS = Patterns.ask(friendActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + if (askResponse.isSuccess()) { + Map address2Info = (Map) askResponse.getExtra(); + List result = Lists.newLinkedList(); + address2Info.forEach((address, metrics) -> { + WorkerStatusVO info = new WorkerStatusVO(address, metrics); + result.add(info); + }); + return ResultDTO.success(result); + } + return ResultDTO.failed(String.valueOf(askResponse.getExtra())); + }catch (Exception e) { + return ResultDTO.failed("no worker or server available"); + } + } + + @GetMapping("/overview") + public ResultDTO getSystemOverview(Long appId) { + + SystemOverviewVO overview = new SystemOverviewVO(); + + // 总任务数量 + overview.setJobCount(jobInfoRepository.countByAppId(appId)); + // 运行任务数 + overview.setRunningInstanceCount(jobInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV())); + // 近期失败任务数(24H内) + Date date = DateUtils.addDays(new Date(), -1); + overview.setFailedInstanceCount(jobInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date)); + + return ResultDTO.success(overview); + } + +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryInstanceRequest.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryInstanceRequest.java new file mode 100644 index 00000000..0c739065 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/request/QueryInstanceRequest.java @@ -0,0 +1,24 @@ +package com.github.kfcfans.oms.server.web.request; + +import lombok.Data; + +/** + * 任务实例查询对象 + * + * @author tjq + * @since 2020/4/14 + */ +@Data +public class QueryInstanceRequest { + + // 任务所属应用ID + private Long appId; + // 当前页码 + private Integer index; + // 页大小 + private Integer pageSize; + + // 查询条件 + private Long instanceId; + private Long jobId; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java index 9881ac9e..e8de401e 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/InstanceLogVO.java @@ -15,28 +15,24 @@ public class InstanceLogVO { // 任务ID private Long jobId; - // 任务所属应用的ID,冗余提高查询效率 - private Long appId; + // 任务名称 + private String jobName; // 任务实例ID private Long instanceId; // 执行结果 private String result; - // 预计触发时间 - private Long expectedTriggerTime; - // 实际触发时间 - private Long actualTriggerTime; - // 结束时间 - private Long finishedTime; + // TaskTracker地址 private String taskTrackerAddress; // 总共执行的次数(用于重试判断) private Long runningTimes; - private Date gmtCreate; - private Date gmtModified; - /* ********** 不一致区域 ********** */ private String status; + // 实际触发时间(需要格式化为人看得懂的时间) + private String actualTriggerTime; + // 结束时间(同理,需要格式化) + private String finishedTime; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/SystemOverviewVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/SystemOverviewVO.java new file mode 100644 index 00000000..e52f744a --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/SystemOverviewVO.java @@ -0,0 +1,16 @@ +package com.github.kfcfans.oms.server.web.response; + +import lombok.Data; + +/** + * 系统概览 + * + * @author tjq + * @since 2020/4/14 + */ +@Data +public class SystemOverviewVO { + private long jobCount; + private long runningInstanceCount; + private long failedInstanceCount; +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkerStatusVO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkerStatusVO.java new file mode 100644 index 00000000..8783517d --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/response/WorkerStatusVO.java @@ -0,0 +1,61 @@ +package com.github.kfcfans.oms.server.web.response; + +import com.github.kfcfans.common.model.SystemMetrics; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.text.DecimalFormat; + +/** + * Worker机器状态 + * + * @author tjq + * @since 2020/4/14 + */ +@Data +@NoArgsConstructor +public class WorkerStatusVO { + + private String address; + private String cpuLoad; + private String memoryLoad; + private String diskLoad; + + // 1 -> 健康,绿色,2 -> 一般,橙色,3 -> 糟糕,红色 + private int status; + + // 12.3%(4 cores) + private static final String CPU_FORMAT = "%s%%(%d cores)"; + // 27.7%(2.9/8.0 GB) + private static final String OTHER_FORMAT = "%s%%(%s/%s GB)"; + private static final DecimalFormat df = new DecimalFormat("#.#"); + + private static final double threshold = 0.8; + + public WorkerStatusVO(String address, SystemMetrics systemMetrics) { + this.address = address; + + String cpuL = df.format(systemMetrics.getCpuLoad() * 100); + this.cpuLoad = String.format(CPU_FORMAT, cpuL, systemMetrics.getCpuProcessors()); + + + String menL = df.format(systemMetrics.getJvmMemoryUsage() * 100); + String menUsed = df.format(systemMetrics.getJvmUsedMemory()); + String menMax = df.format(systemMetrics.getJvmMaxMemory()); + this.memoryLoad = String.format(OTHER_FORMAT, menL, menUsed, menMax); + + String diskL = df.format(systemMetrics.getDiskUsage() * 100); + String diskUsed = df.format(systemMetrics.getDiskUsed()); + String diskMax = df.format(systemMetrics.getDiskTotal()); + this.diskLoad = String.format(OTHER_FORMAT, diskL, diskUsed, diskMax); + + + if (systemMetrics.getCpuLoad() < threshold && systemMetrics.getDiskUsage() < threshold && systemMetrics.getJvmMemoryUsage() < threshold) { + status = 1; + }else if (systemMetrics.getCpuLoad() > threshold && systemMetrics.getDiskUsage() > threshold && systemMetrics.getJvmMemoryUsage() > threshold) { + status = 3; + }else { + status = 2; + } + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SystemInfoUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SystemInfoUtils.java index 998a33fb..656d4352 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SystemInfoUtils.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SystemInfoUtils.java @@ -24,12 +24,14 @@ public class SystemInfoUtils { // CPU 信息 metrics.setCpuProcessors(osMXBean.getAvailableProcessors()); - metrics.setCpuLoad(osMXBean.getSystemLoadAverage()); + metrics.setCpuLoad(osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors()); // JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存,即-Xmx参数设置的值,totalMemory指JVM当前持久的总内存) metrics.setJvmMaxMemory(bytes2GB(runtime.maxMemory())); - metrics.setJvmTotalMemory(bytes2GB(runtime.totalMemory())); - metrics.setJvmUsedMemory(metrics.getJvmTotalMemory() - bytes2GB(runtime.freeMemory())); + // 已使用内存:当前申请总量 - 当前空余量 + metrics.setJvmUsedMemory(bytes2GB(runtime.totalMemory() - runtime.freeMemory())); + // 百分比,直接 * 100 + metrics.setJvmMemoryUsage(1.0 * metrics.getJvmUsedMemory() / runtime.maxMemory()); // 磁盘信息 long free = 0; @@ -42,7 +44,7 @@ public class SystemInfoUtils { metrics.setDiskUsed(bytes2GB(total - free)); metrics.setDiskTotal(bytes2GB(total)); - metrics.setDiskUsage(metrics.getDiskUsed() / metrics.getDiskTotal()); + metrics.setDiskUsage(metrics.getDiskUsed() / metrics.getDiskTotal() * 1.0); // 在Worker完成分数计算,减小Server压力 metrics.calculateScore(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java index 384f2d2f..21b76fc6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/CommonTaskTracker.java @@ -83,6 +83,10 @@ public class CommonTaskTracker extends TaskTracker { * 任务是否超时 */ public boolean isTimeout() { + // 时间不限 + if (instanceInfo.getInstanceTimeoutMS() <= 0) { + return false; + } return System.currentTimeMillis() - createTime > instanceInfo.getInstanceTimeoutMS(); }