mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
CRUD...again...
This commit is contained in:
parent
eff1f996be
commit
9dac48beb0
@ -16,12 +16,12 @@ import java.util.List;
|
||||
@AllArgsConstructor
|
||||
public enum InstanceStatus {
|
||||
|
||||
WAITING_DISPATCH(1, "等待任务派发"),
|
||||
WAITING_DISPATCH(1, "等待派发"),
|
||||
WAITING_WORKER_RECEIVE(2, "等待Worker接收"),
|
||||
RUNNING(3, "运行中"),
|
||||
FAILED(4, "任务运行失败"),
|
||||
SUCCEED(5, "任务运行成功"),
|
||||
STOPPED(10, "任务被手动停止");
|
||||
FAILED(4, "失败"),
|
||||
SUCCEED(5, "成功"),
|
||||
STOPPED(10, "手动停止");
|
||||
|
||||
private int v;
|
||||
private String des;
|
||||
|
@ -32,5 +32,5 @@ public class RemoteConstant {
|
||||
|
||||
/* ************************ OTHERS ************************ */
|
||||
public static final String EMPTY_ADDRESS = "N/A";
|
||||
public static final long DEFAULT_TIMEOUT_MS = 5000;
|
||||
public static final long DEFAULT_TIMEOUT_MS = 3000;
|
||||
}
|
||||
|
@ -14,9 +14,9 @@ import java.io.Serializable;
|
||||
public class InstanceDetail implements Serializable {
|
||||
|
||||
// 任务整体开始时间
|
||||
private long actualTriggerTime;
|
||||
private Long actualTriggerTime;
|
||||
// 任务整体结束时间(可能不存在)
|
||||
private long finishedTime;
|
||||
private Long finishedTime;
|
||||
// 任务状态(中文)
|
||||
private String status;
|
||||
// 任务执行结果(可能不存在)
|
||||
|
@ -15,17 +15,19 @@ public class SystemMetrics implements Serializable, Comparable<SystemMetrics> {
|
||||
|
||||
// CPU核心数量
|
||||
private int cpuProcessors;
|
||||
// CPU负载
|
||||
// CPU负载(需要处以核心数)
|
||||
private double cpuLoad;
|
||||
|
||||
// 内存(单位 GB)
|
||||
private double jvmUsedMemory;
|
||||
private double jvmTotalMemory;
|
||||
private double jvmMaxMemory;
|
||||
// 内存占用(0.X,非百分比)
|
||||
private double jvmMemoryUsage;
|
||||
|
||||
// 磁盘(单位 GB)
|
||||
private double diskUsed;
|
||||
private double diskTotal;
|
||||
// 磁盘占用(0.X,非百分比)
|
||||
private double diskUsage;
|
||||
|
||||
// 缓存分数
|
||||
|
@ -19,6 +19,10 @@ public class ServerScheduleJobReq implements Serializable {
|
||||
|
||||
/* *********************** 任务相关属性 *********************** */
|
||||
|
||||
/**
|
||||
* 任务ID,当更换Server后需要根据 JobId 重新查询任务元数据
|
||||
*/
|
||||
private Long jobId;
|
||||
/**
|
||||
* 基础信息
|
||||
*/
|
||||
|
@ -1,10 +1,15 @@
|
||||
package com.github.kfcfans.oms.server.akka.actors;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import com.github.kfcfans.common.model.SystemMetrics;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq;
|
||||
import com.github.kfcfans.oms.server.akka.requests.Ping;
|
||||
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 处理朋友们的信息(处理服务器与服务器之间的通讯)
|
||||
*
|
||||
@ -17,6 +22,7 @@ public class FriendActor extends AbstractActor {
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(Ping.class, this::onReceivePing)
|
||||
.match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq)
|
||||
.matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
|
||||
.build();
|
||||
}
|
||||
@ -30,4 +36,15 @@ public class FriendActor extends AbstractActor {
|
||||
askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime());
|
||||
getSender().tell(askResponse, getSelf());
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理查询Worker节点的请求
|
||||
*/
|
||||
private void onReceiveFriendQueryWorkerClusterStatusReq(FriendQueryWorkerClusterStatusReq req) {
|
||||
Map<String, SystemMetrics> workerInfo = WorkerManagerService.getActiveWorkerInfo(req.getAppId());
|
||||
AskResponse askResponse = new AskResponse();
|
||||
askResponse.setSuccess(true);
|
||||
askResponse.setExtra(workerInfo);
|
||||
getSender().tell(askResponse, getSelf());
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,20 @@
|
||||
package com.github.kfcfans.oms.server.akka.requests;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 查询 Worker 集群状态
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/14
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class FriendQueryWorkerClusterStatusReq implements Serializable {
|
||||
private Long appId;
|
||||
}
|
@ -53,4 +53,7 @@ public interface InstanceLogRepository extends JpaRepository<InstanceLogDO, Long
|
||||
InstanceLogDO findByInstanceId(long instanceId);
|
||||
|
||||
Page<InstanceLogDO> findByAppId(long appId, Pageable pageable);
|
||||
Page<InstanceLogDO> findByJobId(long jobId, Pageable pageable);
|
||||
// 只会有一条数据,只是为了统一
|
||||
Page<InstanceLogDO> findByInstanceId(long instanceId, Pageable pageable);
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -21,4 +22,9 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
|
||||
Page<JobInfoDO> findByAppIdAndStatusNot(Long appId, Pageable pageable, int status);
|
||||
|
||||
Page<JobInfoDO> findByAppIdAndJobNameLikeAndStatusNot(Long appId, String condition, int status, Pageable pageable);
|
||||
|
||||
|
||||
long countByAppId(long appId);
|
||||
long countByAppIdAndStatus(long appId, int status);
|
||||
long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time);
|
||||
}
|
||||
|
@ -0,0 +1,51 @@
|
||||
package com.github.kfcfans.oms.server.service;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 本地缓存常用数据查询操作
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/14
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class CacheService {
|
||||
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
|
||||
private final Cache<Long, String> jobId2JobNameCache;
|
||||
|
||||
public CacheService() {
|
||||
jobId2JobNameCache = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(Duration.ofHours(1))
|
||||
.maximumSize(1024)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 jobId 查询 jobName(不保证数据一致性,或者说只要改了数据必不一致hhh)
|
||||
*/
|
||||
public String getJobName(Long jobId) {
|
||||
try {
|
||||
return jobId2JobNameCache.get(jobId, () -> {
|
||||
Optional<JobInfoDO> jobInfoDOOptional = jobInfoRepository.findById(jobId);
|
||||
// 防止缓存穿透 hhh
|
||||
return jobInfoDOOptional.map(JobInfoDO::getJobName).orElse("");
|
||||
});
|
||||
}catch (Exception e) {
|
||||
log.error("[CacheService] getJobName for {} failed.", jobId, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -62,13 +62,9 @@ public class ClusterStatusHolder {
|
||||
|
||||
address2Metrics.forEach((address, metrics) -> {
|
||||
|
||||
// 排除超时机器
|
||||
Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L);
|
||||
long timeout = System.currentTimeMillis() - lastActiveTime;
|
||||
if (timeout > WORKER_TIMEOUT_MS) {
|
||||
if (timeout(address)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 判断指标
|
||||
if (metrics.available(minCPUCores, minMemorySpace, minDiskSpace)) {
|
||||
workers.add(address);
|
||||
@ -88,4 +84,25 @@ public class ClusterStatusHolder {
|
||||
public String getClusterDescription() {
|
||||
return String.format("appName:%s,clusterStatus:%s", appName, address2Metrics.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前连接的的机器详情
|
||||
* @return map
|
||||
*/
|
||||
public Map<String, SystemMetrics> getActiveWorkerInfo() {
|
||||
Map<String, SystemMetrics> res = Maps.newHashMap();
|
||||
address2Metrics.forEach((address, metrics) -> {
|
||||
if (!timeout(address)) {
|
||||
res.put(address, metrics);
|
||||
}
|
||||
});
|
||||
return res;
|
||||
}
|
||||
|
||||
private boolean timeout(String address) {
|
||||
// 排除超时机器
|
||||
Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L);
|
||||
long timeout = System.currentTimeMillis() - lastActiveTime;
|
||||
return timeout > WORKER_TIMEOUT_MS;
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.github.kfcfans.oms.server.service.ha;
|
||||
|
||||
import com.github.kfcfans.common.model.SystemMetrics;
|
||||
import com.github.kfcfans.common.request.WorkerHeartbeat;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
@ -36,7 +37,7 @@ public class WorkerManagerService {
|
||||
public static List<String> getSortedAvailableWorker(Long appId, double minCPUCores, double minMemorySpace, double minDiskSpace) {
|
||||
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
|
||||
if (clusterStatusHolder == null) {
|
||||
log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId);
|
||||
log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return clusterStatusHolder.getSortedAvailableWorker(minCPUCores, minMemorySpace, minDiskSpace);
|
||||
@ -64,5 +65,17 @@ public class WorkerManagerService {
|
||||
return clusterStatusHolder.getClusterDescription();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前连接到该Server的Worker信息
|
||||
* @param appId 应用ID
|
||||
* @return Worker信息
|
||||
*/
|
||||
public static Map<String, SystemMetrics> getActiveWorkerInfo(Long appId) {
|
||||
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
|
||||
if (clusterStatusHolder == null) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
return clusterStatusHolder.getActiveWorkerInfo();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -50,6 +50,12 @@ public class InstanceService {
|
||||
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
|
||||
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
|
||||
}
|
||||
|
||||
// 判断状态,只有运行中才能停止
|
||||
if (!InstanceStatus.generalizedRunningStatus.contains(instanceLogDO.getStatus())) {
|
||||
throw new IllegalArgumentException("can't stop finished instance!");
|
||||
}
|
||||
|
||||
// 更新数据库,将状态置为停止
|
||||
instanceLogDO.setStatus(STOPPED.getV());
|
||||
instanceLogDO.setGmtModified(new Date());
|
||||
@ -109,20 +115,4 @@ public class InstanceService {
|
||||
return detail;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取任务实例列表
|
||||
* @param appId 应用ID
|
||||
* @param page 页码
|
||||
* @param size 页大小
|
||||
* @return 分页对象
|
||||
*/
|
||||
public Page<InstanceLogDO> listInstance(long appId, int page, int size) {
|
||||
|
||||
// 按预计触发时间排序
|
||||
Sort sort = Sort.by(Sort.Direction.DESC, "expectedTriggerTime");
|
||||
PageRequest pageRequest = PageRequest.of(page, size, sort);
|
||||
|
||||
return instanceLogRepository.findByAppId(appId, pageRequest);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -149,7 +149,9 @@ public class JobScheduleService {
|
||||
|
||||
try {
|
||||
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
|
||||
Date nextTriggerTime = cronExpression.getNextValidTimeAfter(now);
|
||||
|
||||
Date benchmarkTime = new Date(jobInfoDO.getNextTriggerTime());
|
||||
Date nextTriggerTime = cronExpression.getNextValidTimeAfter(benchmarkTime);
|
||||
|
||||
JobInfoDO updatedJobInfo = new JobInfoDO();
|
||||
BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
|
||||
|
@ -5,13 +5,19 @@ import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.common.model.InstanceDetail;
|
||||
import com.github.kfcfans.oms.server.persistence.PageResult;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.service.CacheService;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest;
|
||||
import com.github.kfcfans.oms.server.web.response.InstanceLogVO;
|
||||
import com.github.kfcfans.oms.server.web.response.JobInfoVO;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.time.DateFormatUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
@ -31,6 +37,12 @@ public class InstanceController {
|
||||
|
||||
@Resource
|
||||
private InstanceService instanceService;
|
||||
@Resource
|
||||
private CacheService cacheService;
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
|
||||
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
|
||||
|
||||
@GetMapping("/stop")
|
||||
public ResultDTO<Void> stopInstance(Long instanceId) {
|
||||
@ -43,19 +55,49 @@ public class InstanceController {
|
||||
return ResultDTO.success(instanceService.getInstanceDetail(instanceId));
|
||||
}
|
||||
|
||||
@GetMapping("/list")
|
||||
public ResultDTO<PageResult<InstanceLogVO>> list(Long appId, int index, int pageSize) {
|
||||
@PostMapping("/list")
|
||||
public ResultDTO<PageResult<InstanceLogVO>> list(@RequestBody QueryInstanceRequest request) {
|
||||
|
||||
Page<InstanceLogDO> page = instanceService.listInstance(appId, index, pageSize);
|
||||
Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified");
|
||||
PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort);
|
||||
|
||||
// 查询全部数据
|
||||
if (request.getJobId() == null && request.getInstanceId() == null) {
|
||||
return ResultDTO.success(convertPage(instanceLogRepository.findByAppId(request.getAppId(), pageable)));
|
||||
}
|
||||
|
||||
// 根据JobId查询
|
||||
if (request.getJobId() != null) {
|
||||
return ResultDTO.success(convertPage(instanceLogRepository.findByJobId(request.getJobId(), pageable)));
|
||||
}
|
||||
|
||||
// 根据InstanceId查询
|
||||
return ResultDTO.success(convertPage(instanceLogRepository.findByInstanceId(request.getInstanceId(), pageable)));
|
||||
}
|
||||
|
||||
private PageResult<InstanceLogVO> convertPage(Page<InstanceLogDO> page) {
|
||||
List<InstanceLogVO> content = page.getContent().stream().map(instanceLogDO -> {
|
||||
InstanceLogVO instanceLogVO = new InstanceLogVO();
|
||||
BeanUtils.copyProperties(instanceLogDO, instanceLogVO);
|
||||
|
||||
// 状态转化为中文
|
||||
instanceLogVO.setStatus(InstanceStatus.of(instanceLogDO.getStatus()).getDes());
|
||||
// 额外设置任务名称,提高可读性
|
||||
instanceLogVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId()));
|
||||
|
||||
// 格式化时间
|
||||
instanceLogVO.setActualTriggerTime(DateFormatUtils.format(instanceLogDO.getActualTriggerTime(), TIME_PATTERN));
|
||||
if (instanceLogDO.getFinishedTime() == null) {
|
||||
instanceLogVO.setFinishedTime("N/A");
|
||||
}else {
|
||||
instanceLogVO.setFinishedTime(DateFormatUtils.format(instanceLogDO.getFinishedTime(), TIME_PATTERN));
|
||||
}
|
||||
|
||||
return instanceLogVO;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
PageResult<InstanceLogVO> pageResult = new PageResult<>(page);
|
||||
pageResult.setData(content);
|
||||
return ResultDTO.success(pageResult);
|
||||
return pageResult;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,97 @@
|
||||
package com.github.kfcfans.oms.server.web.controller;
|
||||
|
||||
import akka.actor.ActorSelection;
|
||||
import akka.pattern.Patterns;
|
||||
import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.RemoteConstant;
|
||||
import com.github.kfcfans.common.model.SystemMetrics;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.web.response.SystemOverviewVO;
|
||||
import com.github.kfcfans.oms.server.web.response.WorkerStatusVO;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 系统信息控制器(服务于前端首页)
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/14
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/system")
|
||||
public class SystemInfoController {
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
|
||||
@GetMapping("/listWorker")
|
||||
@SuppressWarnings("unchecked")
|
||||
public ResultDTO<List<WorkerStatusVO>> listWorker(Long appId) {
|
||||
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
|
||||
if (!appInfoOpt.isPresent()) {
|
||||
return ResultDTO.failed("unknown appId of " +appId);
|
||||
}
|
||||
String server =appInfoOpt.get().getCurrentServer();
|
||||
|
||||
// 没有Server
|
||||
if (StringUtils.isEmpty(server)) {
|
||||
return ResultDTO.success(Collections.emptyList());
|
||||
}
|
||||
|
||||
// 重定向到指定 Server 获取集群信息
|
||||
FriendQueryWorkerClusterStatusReq req = new FriendQueryWorkerClusterStatusReq(appId);
|
||||
try {
|
||||
ActorSelection friendActor = OhMyServer.getFriendActor(server);
|
||||
CompletionStage<Object> askCS = Patterns.ask(friendActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
|
||||
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (askResponse.isSuccess()) {
|
||||
Map<String, SystemMetrics> address2Info = (Map<String, SystemMetrics>) askResponse.getExtra();
|
||||
List<WorkerStatusVO> result = Lists.newLinkedList();
|
||||
address2Info.forEach((address, metrics) -> {
|
||||
WorkerStatusVO info = new WorkerStatusVO(address, metrics);
|
||||
result.add(info);
|
||||
});
|
||||
return ResultDTO.success(result);
|
||||
}
|
||||
return ResultDTO.failed(String.valueOf(askResponse.getExtra()));
|
||||
}catch (Exception e) {
|
||||
return ResultDTO.failed("no worker or server available");
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("/overview")
|
||||
public ResultDTO<SystemOverviewVO> getSystemOverview(Long appId) {
|
||||
|
||||
SystemOverviewVO overview = new SystemOverviewVO();
|
||||
|
||||
// 总任务数量
|
||||
overview.setJobCount(jobInfoRepository.countByAppId(appId));
|
||||
// 运行任务数
|
||||
overview.setRunningInstanceCount(jobInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));
|
||||
// 近期失败任务数(24H内)
|
||||
Date date = DateUtils.addDays(new Date(), -1);
|
||||
overview.setFailedInstanceCount(jobInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));
|
||||
|
||||
return ResultDTO.success(overview);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package com.github.kfcfans.oms.server.web.request;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 任务实例查询对象
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/14
|
||||
*/
|
||||
@Data
|
||||
public class QueryInstanceRequest {
|
||||
|
||||
// 任务所属应用ID
|
||||
private Long appId;
|
||||
// 当前页码
|
||||
private Integer index;
|
||||
// 页大小
|
||||
private Integer pageSize;
|
||||
|
||||
// 查询条件
|
||||
private Long instanceId;
|
||||
private Long jobId;
|
||||
}
|
@ -15,28 +15,24 @@ public class InstanceLogVO {
|
||||
|
||||
// 任务ID
|
||||
private Long jobId;
|
||||
// 任务所属应用的ID,冗余提高查询效率
|
||||
private Long appId;
|
||||
// 任务名称
|
||||
private String jobName;
|
||||
// 任务实例ID
|
||||
private Long instanceId;
|
||||
|
||||
// 执行结果
|
||||
private String result;
|
||||
// 预计触发时间
|
||||
private Long expectedTriggerTime;
|
||||
// 实际触发时间
|
||||
private Long actualTriggerTime;
|
||||
// 结束时间
|
||||
private Long finishedTime;
|
||||
|
||||
// TaskTracker地址
|
||||
private String taskTrackerAddress;
|
||||
|
||||
// 总共执行的次数(用于重试判断)
|
||||
private Long runningTimes;
|
||||
|
||||
private Date gmtCreate;
|
||||
private Date gmtModified;
|
||||
|
||||
/* ********** 不一致区域 ********** */
|
||||
private String status;
|
||||
// 实际触发时间(需要格式化为人看得懂的时间)
|
||||
private String actualTriggerTime;
|
||||
// 结束时间(同理,需要格式化)
|
||||
private String finishedTime;
|
||||
}
|
||||
|
@ -0,0 +1,16 @@
|
||||
package com.github.kfcfans.oms.server.web.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 系统概览
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/14
|
||||
*/
|
||||
@Data
|
||||
public class SystemOverviewVO {
|
||||
private long jobCount;
|
||||
private long runningInstanceCount;
|
||||
private long failedInstanceCount;
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
package com.github.kfcfans.oms.server.web.response;
|
||||
|
||||
import com.github.kfcfans.common.model.SystemMetrics;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.text.DecimalFormat;
|
||||
|
||||
/**
|
||||
* Worker机器状态
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/14
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class WorkerStatusVO {
|
||||
|
||||
private String address;
|
||||
private String cpuLoad;
|
||||
private String memoryLoad;
|
||||
private String diskLoad;
|
||||
|
||||
// 1 -> 健康,绿色,2 -> 一般,橙色,3 -> 糟糕,红色
|
||||
private int status;
|
||||
|
||||
// 12.3%(4 cores)
|
||||
private static final String CPU_FORMAT = "%s%%(%d cores)";
|
||||
// 27.7%(2.9/8.0 GB)
|
||||
private static final String OTHER_FORMAT = "%s%%(%s/%s GB)";
|
||||
private static final DecimalFormat df = new DecimalFormat("#.#");
|
||||
|
||||
private static final double threshold = 0.8;
|
||||
|
||||
public WorkerStatusVO(String address, SystemMetrics systemMetrics) {
|
||||
this.address = address;
|
||||
|
||||
String cpuL = df.format(systemMetrics.getCpuLoad() * 100);
|
||||
this.cpuLoad = String.format(CPU_FORMAT, cpuL, systemMetrics.getCpuProcessors());
|
||||
|
||||
|
||||
String menL = df.format(systemMetrics.getJvmMemoryUsage() * 100);
|
||||
String menUsed = df.format(systemMetrics.getJvmUsedMemory());
|
||||
String menMax = df.format(systemMetrics.getJvmMaxMemory());
|
||||
this.memoryLoad = String.format(OTHER_FORMAT, menL, menUsed, menMax);
|
||||
|
||||
String diskL = df.format(systemMetrics.getDiskUsage() * 100);
|
||||
String diskUsed = df.format(systemMetrics.getDiskUsed());
|
||||
String diskMax = df.format(systemMetrics.getDiskTotal());
|
||||
this.diskLoad = String.format(OTHER_FORMAT, diskL, diskUsed, diskMax);
|
||||
|
||||
|
||||
if (systemMetrics.getCpuLoad() < threshold && systemMetrics.getDiskUsage() < threshold && systemMetrics.getJvmMemoryUsage() < threshold) {
|
||||
status = 1;
|
||||
}else if (systemMetrics.getCpuLoad() > threshold && systemMetrics.getDiskUsage() > threshold && systemMetrics.getJvmMemoryUsage() > threshold) {
|
||||
status = 3;
|
||||
}else {
|
||||
status = 2;
|
||||
}
|
||||
}
|
||||
}
|
@ -24,12 +24,14 @@ public class SystemInfoUtils {
|
||||
|
||||
// CPU 信息
|
||||
metrics.setCpuProcessors(osMXBean.getAvailableProcessors());
|
||||
metrics.setCpuLoad(osMXBean.getSystemLoadAverage());
|
||||
metrics.setCpuLoad(osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors());
|
||||
|
||||
// JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存,即-Xmx参数设置的值,totalMemory指JVM当前持久的总内存)
|
||||
metrics.setJvmMaxMemory(bytes2GB(runtime.maxMemory()));
|
||||
metrics.setJvmTotalMemory(bytes2GB(runtime.totalMemory()));
|
||||
metrics.setJvmUsedMemory(metrics.getJvmTotalMemory() - bytes2GB(runtime.freeMemory()));
|
||||
// 已使用内存:当前申请总量 - 当前空余量
|
||||
metrics.setJvmUsedMemory(bytes2GB(runtime.totalMemory() - runtime.freeMemory()));
|
||||
// 百分比,直接 * 100
|
||||
metrics.setJvmMemoryUsage(1.0 * metrics.getJvmUsedMemory() / runtime.maxMemory());
|
||||
|
||||
// 磁盘信息
|
||||
long free = 0;
|
||||
@ -42,7 +44,7 @@ public class SystemInfoUtils {
|
||||
|
||||
metrics.setDiskUsed(bytes2GB(total - free));
|
||||
metrics.setDiskTotal(bytes2GB(total));
|
||||
metrics.setDiskUsage(metrics.getDiskUsed() / metrics.getDiskTotal());
|
||||
metrics.setDiskUsage(metrics.getDiskUsed() / metrics.getDiskTotal() * 1.0);
|
||||
|
||||
// 在Worker完成分数计算,减小Server压力
|
||||
metrics.calculateScore();
|
||||
|
@ -83,6 +83,10 @@ public class CommonTaskTracker extends TaskTracker {
|
||||
* 任务是否超时
|
||||
*/
|
||||
public boolean isTimeout() {
|
||||
// 时间不限
|
||||
if (instanceInfo.getInstanceTimeoutMS() <= 0) {
|
||||
return false;
|
||||
}
|
||||
return System.currentTimeMillis() - createTime > instanceInfo.getInstanceTimeoutMS();
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user