mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
I have to say CRUD is really boring but I have no choice...
This commit is contained in:
parent
9ce850eaf4
commit
87d4654930
@ -17,8 +17,8 @@ import java.util.List;
|
||||
public enum InstanceStatus {
|
||||
|
||||
WAITING_DISPATCH(1, "等待任务派发"),
|
||||
WAITING_WORKER_RECEIVE(2, "Server已完成任务派发,等待Worker接收"),
|
||||
RUNNING(3, "Worker接收成功,正在运行任务"),
|
||||
WAITING_WORKER_RECEIVE(2, "等待Worker接收"),
|
||||
RUNNING(3, "运行中"),
|
||||
FAILED(4, "任务运行失败"),
|
||||
SUCCEED(5, "任务运行成功"),
|
||||
STOPPED(10, "任务被手动停止");
|
||||
|
@ -1,7 +1,9 @@
|
||||
package com.github.kfcfans.oms.server.service.instance;
|
||||
package com.github.kfcfans.common.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 任务实例的运行详细信息(对外)
|
||||
*
|
||||
@ -9,7 +11,7 @@ import lombok.Data;
|
||||
* @since 2020/4/11
|
||||
*/
|
||||
@Data
|
||||
public class InstanceDetail {
|
||||
public class InstanceDetail implements Serializable {
|
||||
|
||||
// 任务整体开始时间
|
||||
private long actualTriggerTime;
|
||||
@ -26,15 +28,17 @@ public class InstanceDetail {
|
||||
|
||||
|
||||
// 秒级任务的 extra -> List<SubInstanceDetail>
|
||||
private static class SubInstanceDetail {
|
||||
@Data
|
||||
public static class SubInstanceDetail implements Serializable {
|
||||
private long startTime;
|
||||
private long finishedTime;
|
||||
private String status;
|
||||
private String result;
|
||||
private String status;
|
||||
}
|
||||
|
||||
// MapReduce 和 Broadcast 任务的 extra ->
|
||||
private static class ClusterDetail {
|
||||
@Data
|
||||
public static class TaskDetail implements Serializable {
|
||||
private long totalTaskNum;
|
||||
private long succeedTaskNum;
|
||||
private long failedTaskNum;
|
@ -31,8 +31,6 @@ public class SystemMetrics implements Serializable, Comparable<SystemMetrics> {
|
||||
// 缓存分数
|
||||
private int score;
|
||||
|
||||
public static final int MIN_SCORE = 1;
|
||||
|
||||
@Override
|
||||
public int compareTo(SystemMetrics that) {
|
||||
return this.calculateScore() - that.calculateScore();
|
||||
@ -50,21 +48,25 @@ public class SystemMetrics implements Serializable, Comparable<SystemMetrics> {
|
||||
|
||||
double availableCPUCores = cpuProcessors * cpuLoad;
|
||||
double availableMemory = jvmMaxMemory - jvmUsedMemory;
|
||||
double availableDisk = diskTotal - diskUsage;
|
||||
|
||||
// 保护性判断,Windows下无法获取CPU可用核心数,先固定 0.5
|
||||
if (availableCPUCores < 0) {
|
||||
availableCPUCores = 0.5;
|
||||
}
|
||||
// Windows下无法获取CPU可用核心数,值固定为-1
|
||||
cpuLoad = Math.max(0, cpuLoad);
|
||||
|
||||
// 最低运行标准,1G磁盘 & 0.5G内存 & 一个可用的CPU核心
|
||||
if (availableDisk < 1 || availableMemory < 0.5 || availableCPUCores < 0.5) {
|
||||
score = MIN_SCORE;
|
||||
} else {
|
||||
// 磁盘只需要满足最低标准即可
|
||||
score = (int) (availableMemory * 2 + availableCPUCores);
|
||||
}
|
||||
return (int) (availableMemory * 2 + availableCPUCores);
|
||||
}
|
||||
|
||||
return score;
|
||||
/**
|
||||
* 该机器是否可用
|
||||
* @param minCPUCores 判断标准之最低可用CPU核心数量
|
||||
* @param minMemorySpace 判断标准之最低可用内存
|
||||
* @param minDiskSpace 判断标准之最低可用磁盘空间
|
||||
* @return 是否可用
|
||||
*/
|
||||
public boolean available(double minCPUCores, double minMemorySpace, double minDiskSpace) {
|
||||
|
||||
double currentCpuCores = Math.max(cpuLoad * cpuProcessors, 0);
|
||||
double currentMemory = jvmMaxMemory - jvmUsedMemory;
|
||||
double currentDisk = diskTotal - diskUsed;
|
||||
return currentCpuCores >= minCPUCores && currentMemory >= minMemorySpace && currentDisk >= minDiskSpace;
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package com.github.kfcfans.common.request;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@ -11,6 +13,8 @@ import java.io.Serializable;
|
||||
* @since 2020/4/10
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ServerQueryInstanceStatusReq implements Serializable {
|
||||
private Long instanceId;
|
||||
}
|
||||
|
@ -0,0 +1,47 @@
|
||||
package com.github.kfcfans.oms.server.persistence;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.data.domain.Page;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 分页对象
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/12
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class PageResult<T> implements Serializable {
|
||||
|
||||
/**
|
||||
* 当前页数
|
||||
*/
|
||||
private int index;
|
||||
/**
|
||||
* 页大小
|
||||
*/
|
||||
private int pageSize;
|
||||
/**
|
||||
* 总页数
|
||||
*/
|
||||
private int totalPages;
|
||||
/**
|
||||
* 总数据量
|
||||
*/
|
||||
private long totalItems;
|
||||
/**
|
||||
* 数据
|
||||
*/
|
||||
private List<T> data;
|
||||
|
||||
public PageResult(Page<?> page) {
|
||||
index = page.getNumber();
|
||||
pageSize = page.getSize();
|
||||
totalPages = page.getTotalPages();
|
||||
totalItems = page.getTotalElements();
|
||||
}
|
||||
}
|
@ -13,8 +13,8 @@ import java.util.Date;
|
||||
*/
|
||||
@Data
|
||||
@Entity
|
||||
@Table(name = "execute_log", indexes = {@Index(columnList = "jobId")})
|
||||
public class ExecuteLogDO {
|
||||
@Table(name = "instance_log", indexes = {@Index(columnList = "jobId")})
|
||||
public class InstanceLogDO {
|
||||
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
@ -65,6 +65,14 @@ public class JobInfoDO {
|
||||
// 下一次调度时间
|
||||
private Long nextTriggerTime;
|
||||
|
||||
/* ************************** 繁忙机器配置 ************************** */
|
||||
// 最低CPU核心数量,0代表不限
|
||||
private double minCpuCores;
|
||||
// 最低内存空间,单位 GB,0代表不限
|
||||
private double minMemorySpace;
|
||||
// 最低磁盘空间,单位 GB,0代表不限
|
||||
private double minDiskSpace;
|
||||
|
||||
|
||||
private Date gmtCreate;
|
||||
private Date gmtModified;
|
||||
|
@ -0,0 +1,33 @@
|
||||
package com.github.kfcfans.oms.server.persistence.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import javax.persistence.*;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 用户信息表
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/12
|
||||
*/
|
||||
@Data
|
||||
@Entity
|
||||
@Table(name = "user_info")
|
||||
public class UserInfoDO {
|
||||
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
private Long id;
|
||||
|
||||
private String username;
|
||||
private String password;
|
||||
|
||||
// 手机号
|
||||
private String phone;
|
||||
// 邮箱地址
|
||||
private String email;
|
||||
|
||||
private Date gmtCreate;
|
||||
private Date gmtModified;
|
||||
}
|
@ -1,6 +1,8 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
@ -15,14 +17,14 @@ import java.util.List;
|
||||
* @author tjq
|
||||
* @since 2020/4/1
|
||||
*/
|
||||
public interface ExecuteLogRepository extends JpaRepository<ExecuteLogDO, Long> {
|
||||
public interface InstanceLogRepository extends JpaRepository<InstanceLogDO, Long> {
|
||||
|
||||
/**
|
||||
* 统计当前JOB有多少实例正在运行
|
||||
*/
|
||||
long countByJobIdAndStatusIn(long jobId, List<Integer> status);
|
||||
|
||||
List<ExecuteLogDO> findByJobIdAndStatusIn(long jobId, List<Integer> status);
|
||||
List<InstanceLogDO> findByJobIdAndStatusIn(long jobId, List<Integer> status);
|
||||
|
||||
|
||||
/**
|
||||
@ -44,9 +46,11 @@ public interface ExecuteLogRepository extends JpaRepository<ExecuteLogDO, Long>
|
||||
int update4FrequentJob(long instanceId, int status, long runningTimes);
|
||||
|
||||
// 状态检查三兄弟,对应 WAITING_DISPATCH 、 WAITING_WORKER_RECEIVE 和 RUNNING 三阶段
|
||||
List<ExecuteLogDO> findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
List<ExecuteLogDO> findByJobIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
List<ExecuteLogDO> findByJobIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
|
||||
List<InstanceLogDO> findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
List<InstanceLogDO> findByJobIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
List<InstanceLogDO> findByJobIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
|
||||
|
||||
ExecuteLogDO findByInstanceId(long instanceId);
|
||||
InstanceLogDO findByInstanceId(long instanceId);
|
||||
|
||||
Page<InstanceLogDO> findByAppId(long appId, Pageable pageable);
|
||||
}
|
@ -1,6 +1,8 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import java.util.List;
|
||||
@ -16,5 +18,5 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
|
||||
|
||||
List<JobInfoDO> findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time);
|
||||
|
||||
List<JobInfoDO> findByAppIdInAndStatusAndTimeExpressionType(List<Long> appIds, int status, int timeExpressionType);
|
||||
Page<JobInfoDO> findByAppId(Long appId, Pageable pageable);
|
||||
}
|
||||
|
@ -0,0 +1,13 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.UserInfoDO;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
/**
|
||||
* 用户信息表数据库访问层
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/12
|
||||
*/
|
||||
public interface UserInfoRepository extends JpaRepository<UserInfoDO, Long> {
|
||||
}
|
@ -5,13 +5,12 @@ import com.github.kfcfans.common.*;
|
||||
import com.github.kfcfans.common.request.ServerScheduleJobReq;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
@ -30,7 +29,7 @@ import static com.github.kfcfans.common.InstanceStatus.*;
|
||||
public class DispatchService {
|
||||
|
||||
@Resource
|
||||
private ExecuteLogRepository executeLogRepository;
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
|
||||
private static final String EMPTY_RESULT = "";
|
||||
|
||||
@ -46,31 +45,26 @@ public class DispatchService {
|
||||
log.info("[DispatchService] start to dispatch job: {}.", jobInfo);
|
||||
// 查询当前运行的实例数
|
||||
long current = System.currentTimeMillis();
|
||||
long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
|
||||
long runningInstanceCount = instanceLogRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
|
||||
|
||||
// 超出最大同时运行限制,不执行调度
|
||||
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
|
||||
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
|
||||
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount);
|
||||
executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, result);
|
||||
instanceLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, result);
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取 Worker
|
||||
String taskTrackerAddress = WorkerManagerService.chooseBestWorker(jobInfo.getAppId());
|
||||
List<String> allAvailableWorker = WorkerManagerService.getAllAvailableWorker(jobInfo.getAppId());
|
||||
List<String> allAvailableWorker = WorkerManagerService.getSortedAvailableWorker(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
|
||||
|
||||
if (StringUtils.isEmpty(taskTrackerAddress)) {
|
||||
if (CollectionUtils.isEmpty(allAvailableWorker)) {
|
||||
String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId());
|
||||
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription);
|
||||
executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE);
|
||||
instanceLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE);
|
||||
return;
|
||||
}
|
||||
|
||||
// 消除非原子操作带来的潜在不一致
|
||||
allAvailableWorker.remove(taskTrackerAddress);
|
||||
allAvailableWorker.add(taskTrackerAddress);
|
||||
|
||||
// 构造请求
|
||||
ServerScheduleJobReq req = new ServerScheduleJobReq();
|
||||
BeanUtils.copyProperties(jobInfo, req);
|
||||
@ -86,11 +80,12 @@ public class DispatchService {
|
||||
req.setThreadConcurrency(jobInfo.getConcurrency());
|
||||
|
||||
// 发送请求(不可靠,需要一个后台线程定期轮询状态)
|
||||
String taskTrackerAddress = allAvailableWorker.get(0);
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress);
|
||||
taskTrackerActor.tell(req, null);
|
||||
log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString());
|
||||
|
||||
// 修改状态
|
||||
executeLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, EMPTY_RESULT);
|
||||
instanceLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, EMPTY_RESULT);
|
||||
}
|
||||
}
|
||||
|
@ -50,74 +50,37 @@ public class ClusterStatusHolder {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 选取状态最好的Worker进行任务派发
|
||||
* @return Worker的地址(null代表没有可用的Worker)
|
||||
*/
|
||||
public String chooseBestWorker() {
|
||||
|
||||
// 直接对 HashMap 根据Value进行排序
|
||||
List<Map.Entry<String, SystemMetrics>> entryList = Lists.newArrayList(address2Metrics.entrySet());
|
||||
|
||||
// 降序排序(Comparator.comparingInt默认为升序,弃用)
|
||||
entryList.sort((o1, o2) -> o2.getValue().calculateScore() - o1.getValue().calculateScore());
|
||||
|
||||
for (Map.Entry<String, SystemMetrics> entry : address2Metrics.entrySet()) {
|
||||
String address = entry.getKey();
|
||||
if (available(address)) {
|
||||
return address;
|
||||
}
|
||||
}
|
||||
|
||||
log.warn("[ClusterStatusHolder] no worker available for {}, worker status is {}.", appName, address2Metrics);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前所有可用的 Worker
|
||||
* @param minCPUCores 最低CPU核心数量
|
||||
* @param minMemorySpace 最低内存可用空间,单位GB
|
||||
* @param minDiskSpace 最低磁盘可用空间,单位GB
|
||||
* @return List<Worker>
|
||||
*/
|
||||
public List<String> getAllAvailableWorker() {
|
||||
public List<String> getSortedAvailableWorker(double minCPUCores, double minMemorySpace, double minDiskSpace) {
|
||||
List<String> workers = Lists.newLinkedList();
|
||||
|
||||
address2Metrics.forEach((address, ignore) -> {
|
||||
if (available(address)) {
|
||||
address2Metrics.forEach((address, metrics) -> {
|
||||
|
||||
// 排除超时机器
|
||||
Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L);
|
||||
long timeout = System.currentTimeMillis() - lastActiveTime;
|
||||
if (timeout > WORKER_TIMEOUT_MS) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 判断指标
|
||||
if (metrics.available(minCPUCores, minMemorySpace, minDiskSpace)) {
|
||||
workers.add(address);
|
||||
}
|
||||
});
|
||||
|
||||
// 按机器健康度排序
|
||||
workers.sort((o1, o2) -> address2Metrics.get(o2).calculateScore() - address2Metrics.get(o1).calculateScore());
|
||||
|
||||
return workers;
|
||||
}
|
||||
|
||||
/**
|
||||
* 某台具体的 Worker 是否可用
|
||||
* @param address 需要检测的Worker地址
|
||||
* @return 可用状态
|
||||
*/
|
||||
private boolean available(String address) {
|
||||
SystemMetrics metrics = address2Metrics.get(address);
|
||||
if (metrics.calculateScore() == SystemMetrics.MIN_SCORE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L);
|
||||
long timeout = System.currentTimeMillis() - lastActiveTime;
|
||||
return timeout < WORKER_TIMEOUT_MS;
|
||||
}
|
||||
|
||||
/**
|
||||
* 整个 Worker 集群是否可用(某个App下的所有机器是否可用)
|
||||
* @return 有一台机器可用 -> true / 全军覆没 -> false
|
||||
*/
|
||||
public boolean available() {
|
||||
for (String address : address2Metrics.keySet()) {
|
||||
if (available(address)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取整个集群的简介
|
||||
* @return 获取集群简介
|
||||
|
@ -1,7 +1,6 @@
|
||||
package com.github.kfcfans.oms.server.service.ha;
|
||||
|
||||
import com.github.kfcfans.common.request.WorkerHeartbeat;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -32,29 +31,15 @@ public class WorkerManagerService {
|
||||
}
|
||||
|
||||
/**
|
||||
* 选择状态最好的Worker执行任务
|
||||
* @param appId 应用ID
|
||||
* @return Worker的地址(null代表没有可用的Worker)
|
||||
* 获取有序的当前所有可用的Worker地址(按得分高低排序,排在前面的健康度更高)
|
||||
*/
|
||||
public static String chooseBestWorker(Long appId) {
|
||||
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
|
||||
if (clusterStatusHolder == null) {
|
||||
log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId);
|
||||
return null;
|
||||
}
|
||||
return clusterStatusHolder.chooseBestWorker();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前所有可用的Worker地址
|
||||
*/
|
||||
public static List<String> getAllAvailableWorker(Long appId) {
|
||||
public static List<String> getSortedAvailableWorker(Long appId, double minCPUCores, double minMemorySpace, double minDiskSpace) {
|
||||
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
|
||||
if (clusterStatusHolder == null) {
|
||||
log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return clusterStatusHolder.getAllAvailableWorker();
|
||||
return clusterStatusHolder.getSortedAvailableWorker(minCPUCores, minMemorySpace, minDiskSpace);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4,9 +4,9 @@ import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.common.utils.SpringUtils;
|
||||
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.DispatchService;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -33,7 +33,7 @@ public class InstanceManager {
|
||||
|
||||
// Spring Bean
|
||||
private static DispatchService dispatchService;
|
||||
private static ExecuteLogRepository executeLogRepository;
|
||||
private static InstanceLogRepository instanceLogRepository;
|
||||
private static JobInfoRepository jobInfoRepository;
|
||||
|
||||
/**
|
||||
@ -90,11 +90,11 @@ public class InstanceManager {
|
||||
// FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行)
|
||||
// 综上,直接把 status 和 runningNum 同步到DB即可
|
||||
if (timeExpressionType != TimeExpressionType.CRON.getV()) {
|
||||
getExecuteLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum());
|
||||
getInstanceLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum());
|
||||
return;
|
||||
}
|
||||
|
||||
ExecuteLogDO updateEntity = getExecuteLogRepository().findByInstanceId(instanceId);
|
||||
InstanceLogDO updateEntity = getInstanceLogRepository().findByInstanceId(instanceId);
|
||||
updateEntity.setStatus(newStatus.getV());
|
||||
updateEntity.setGmtModified(new Date());
|
||||
|
||||
@ -121,7 +121,7 @@ public class InstanceManager {
|
||||
}
|
||||
|
||||
// 同步状态变更信息到数据库
|
||||
getExecuteLogRepository().saveAndFlush(updateEntity);
|
||||
getInstanceLogRepository().saveAndFlush(updateEntity);
|
||||
|
||||
// 清除已完成的实例信息
|
||||
if (finished) {
|
||||
@ -153,15 +153,15 @@ public class InstanceManager {
|
||||
return instanceId2StatusHolder.get(instanceId);
|
||||
}
|
||||
|
||||
private static ExecuteLogRepository getExecuteLogRepository() {
|
||||
while (executeLogRepository == null) {
|
||||
private static InstanceLogRepository getInstanceLogRepository() {
|
||||
while (instanceLogRepository == null) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
}catch (Exception ignore) {
|
||||
}
|
||||
executeLogRepository = SpringUtils.getBean(ExecuteLogRepository.class);
|
||||
instanceLogRepository = SpringUtils.getBean(InstanceLogRepository.class);
|
||||
}
|
||||
return executeLogRepository;
|
||||
return instanceLogRepository;
|
||||
}
|
||||
|
||||
private static JobInfoRepository getJobInfoRepository() {
|
||||
|
@ -1,20 +1,27 @@
|
||||
package com.github.kfcfans.oms.server.service.instance;
|
||||
|
||||
import akka.actor.ActorSelection;
|
||||
import akka.pattern.Patterns;
|
||||
import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.RemoteConstant;
|
||||
import com.github.kfcfans.common.SystemInstanceResult;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.common.model.InstanceDetail;
|
||||
import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq;
|
||||
import com.github.kfcfans.common.request.ServerStopInstanceReq;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.data.domain.*;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.github.kfcfans.common.InstanceStatus.RUNNING;
|
||||
import static com.github.kfcfans.common.InstanceStatus.STOPPED;
|
||||
@ -30,9 +37,7 @@ import static com.github.kfcfans.common.InstanceStatus.STOPPED;
|
||||
public class InstanceService {
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private ExecuteLogRepository executeLogRepository;
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
|
||||
/**
|
||||
* 停止任务实例
|
||||
@ -40,48 +45,84 @@ public class InstanceService {
|
||||
*/
|
||||
public void stopInstance(Long instanceId) {
|
||||
|
||||
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
|
||||
if (executeLogDO == null) {
|
||||
InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId);
|
||||
if (instanceLogDO == null) {
|
||||
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
|
||||
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
|
||||
}
|
||||
// 更新数据库,将状态置为停止
|
||||
executeLogDO.setStatus(STOPPED.getV());
|
||||
executeLogDO.setGmtModified(new Date());
|
||||
executeLogDO.setFinishedTime(System.currentTimeMillis());
|
||||
executeLogDO.setResult(SystemInstanceResult.STOPPED_BY_USER);
|
||||
executeLogRepository.saveAndFlush(executeLogDO);
|
||||
instanceLogDO.setStatus(STOPPED.getV());
|
||||
instanceLogDO.setGmtModified(new Date());
|
||||
instanceLogDO.setFinishedTime(System.currentTimeMillis());
|
||||
instanceLogDO.setResult(SystemInstanceResult.STOPPED_BY_USER);
|
||||
instanceLogRepository.saveAndFlush(instanceLogDO);
|
||||
|
||||
// 停止 TaskTracker
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(executeLogDO.getTaskTrackerAddress());
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceLogDO.getTaskTrackerAddress());
|
||||
ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
|
||||
taskTrackerActor.tell(req, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取任务实例的详细运行详细
|
||||
* @param instanceId 任务实例ID
|
||||
* @return 详细运行状态
|
||||
*/
|
||||
public InstanceDetail getInstanceDetail(Long instanceId) {
|
||||
|
||||
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
|
||||
if (executeLogDO == null) {
|
||||
InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId);
|
||||
if (instanceLogDO == null) {
|
||||
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
|
||||
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
|
||||
}
|
||||
|
||||
InstanceStatus instanceStatus = InstanceStatus.of(executeLogDO.getStatus());
|
||||
InstanceStatus instanceStatus = InstanceStatus.of(instanceLogDO.getStatus());
|
||||
|
||||
InstanceDetail detail = new InstanceDetail();
|
||||
detail.setStatus(instanceStatus.getDes());
|
||||
|
||||
// 只要不是运行状态,只需要返回简要信息
|
||||
if (instanceStatus != RUNNING) {
|
||||
BeanUtils.copyProperties(executeLogDO, detail);
|
||||
BeanUtils.copyProperties(instanceLogDO, detail);
|
||||
return detail;
|
||||
}
|
||||
|
||||
// 运行状态下,需要分别考虑MapReduce、Broadcast和秒级任务的详细信息
|
||||
// 运行状态下,交由 TaskTracker 返回相关信息
|
||||
try {
|
||||
ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId);
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceLogDO.getTaskTrackerAddress());
|
||||
CompletionStage<Object> askCS = Patterns.ask(taskTrackerActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
|
||||
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (askResponse.isSuccess()) {
|
||||
return (InstanceDetail) askResponse.getExtra();
|
||||
}else {
|
||||
log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getExtra());
|
||||
}
|
||||
|
||||
}catch (Exception e) {
|
||||
log.error("[InstanceService] ask InstanceStatus from TaskTracker failed.", e);
|
||||
}
|
||||
|
||||
return null;
|
||||
// 失败则返回基础版信息
|
||||
BeanUtils.copyProperties(instanceLogDO, detail);
|
||||
return detail;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取任务实例列表
|
||||
* @param appId 应用ID
|
||||
* @param page 页码
|
||||
* @param size 页大小
|
||||
* @return 分页对象
|
||||
*/
|
||||
public Page<InstanceLogDO> listInstance(long appId, int page, int size) {
|
||||
|
||||
// 按预计触发时间排序
|
||||
Sort sort = Sort.by(Sort.Direction.DESC, "expectedTriggerTime");
|
||||
PageRequest pageRequest = PageRequest.of(page, size, sort);
|
||||
|
||||
return instanceLogRepository.findByAppId(appId, pageRequest);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -6,10 +6,10 @@ import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.common.constans.JobStatus;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.DispatchService;
|
||||
import com.google.common.base.Stopwatch;
|
||||
@ -44,7 +44,7 @@ public class InstanceStatusCheckService {
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private ExecuteLogRepository executeLogRepository;
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
|
||||
@ -73,7 +73,7 @@ public class InstanceStatusCheckService {
|
||||
|
||||
// 1. 检查等待 WAITING_DISPATCH 状态的任务
|
||||
long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
|
||||
List<ExecuteLogDO> waitingDispatchInstances = executeLogRepository.findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
|
||||
List<InstanceLogDO> waitingDispatchInstances = instanceLogRepository.findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
|
||||
if (!CollectionUtils.isEmpty(waitingDispatchInstances)) {
|
||||
log.warn("[InstanceStatusCheckService] instances({}) is not triggered as expected.", waitingDispatchInstances);
|
||||
waitingDispatchInstances.forEach(instance -> {
|
||||
@ -85,7 +85,7 @@ public class InstanceStatusCheckService {
|
||||
|
||||
// 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
|
||||
threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
|
||||
List<ExecuteLogDO> waitingWorkerReceiveInstances = executeLogRepository.findByJobIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
|
||||
List<InstanceLogDO> waitingWorkerReceiveInstances = instanceLogRepository.findByJobIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
|
||||
if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) {
|
||||
log.warn("[InstanceStatusCheckService] instances({}) did n’t receive any reply from worker.", waitingWorkerReceiveInstances);
|
||||
waitingWorkerReceiveInstances.forEach(instance -> {
|
||||
@ -97,7 +97,7 @@ public class InstanceStatusCheckService {
|
||||
|
||||
// 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败)
|
||||
threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
|
||||
List<ExecuteLogDO> failedInstances = executeLogRepository.findByJobIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
|
||||
List<InstanceLogDO> failedInstances = instanceLogRepository.findByJobIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
|
||||
if (!CollectionUtils.isEmpty(failedInstances)) {
|
||||
log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances);
|
||||
failedInstances.forEach(instance -> {
|
||||
@ -133,11 +133,11 @@ public class InstanceStatusCheckService {
|
||||
/**
|
||||
* 处理上报超时而失败的任务实例
|
||||
*/
|
||||
private void updateFailedInstance(ExecuteLogDO instance) {
|
||||
private void updateFailedInstance(InstanceLogDO instance) {
|
||||
instance.setStatus(InstanceStatus.FAILED.getV());
|
||||
instance.setFinishedTime(System.currentTimeMillis());
|
||||
instance.setGmtModified(new Date());
|
||||
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
|
||||
executeLogRepository.saveAndFlush(instance);
|
||||
instanceLogRepository.saveAndFlush(instance);
|
||||
}
|
||||
}
|
||||
|
@ -7,11 +7,11 @@ import com.github.kfcfans.oms.server.common.utils.CronExpression;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.service.DispatchService;
|
||||
import com.github.kfcfans.oms.server.service.IdGenerateService;
|
||||
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
|
||||
@ -52,7 +52,7 @@ public class JobScheduleService {
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private ExecuteLogRepository executeLogRepository;
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
|
||||
private static final long SCHEDULE_RATE = 5000;
|
||||
|
||||
@ -103,10 +103,10 @@ public class JobScheduleService {
|
||||
Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
|
||||
log.info("[JobScheduleService] These cron jobs will be scheduled: {}.", jobInfos);
|
||||
|
||||
List<ExecuteLogDO> executeLogs = Lists.newLinkedList();
|
||||
List<InstanceLogDO> executeLogs = Lists.newLinkedList();
|
||||
jobInfos.forEach(jobInfoDO -> {
|
||||
|
||||
ExecuteLogDO executeLog = new ExecuteLogDO();
|
||||
InstanceLogDO executeLog = new InstanceLogDO();
|
||||
executeLog.setJobId(jobInfoDO.getId());
|
||||
executeLog.setAppId(jobInfoDO.getAppId());
|
||||
executeLog.setInstanceId(IdGenerateService.allocate());
|
||||
@ -119,8 +119,8 @@ public class JobScheduleService {
|
||||
|
||||
jobId2InstanceId.put(executeLog.getJobId(), executeLog.getInstanceId());
|
||||
});
|
||||
executeLogRepository.saveAll(executeLogs);
|
||||
executeLogRepository.flush();
|
||||
instanceLogRepository.saveAll(executeLogs);
|
||||
instanceLogRepository.flush();
|
||||
|
||||
// 2. 推入时间轮中等待调度执行
|
||||
jobInfos.forEach(jobInfoDO -> {
|
||||
|
@ -1,13 +1,22 @@
|
||||
package com.github.kfcfans.oms.server.web.controller;
|
||||
|
||||
import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceDetail;
|
||||
import com.github.kfcfans.common.model.InstanceDetail;
|
||||
import com.github.kfcfans.oms.server.persistence.PageResult;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
import com.github.kfcfans.oms.server.web.response.InstanceLogVO;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
||||
/**
|
||||
@ -33,4 +42,20 @@ public class InstanceController {
|
||||
public ResultDTO<InstanceDetail> getRunningStatus(Long instanceId) {
|
||||
return ResultDTO.success(instanceService.getInstanceDetail(instanceId));
|
||||
}
|
||||
|
||||
@GetMapping("/list")
|
||||
public ResultDTO<PageResult<InstanceLogVO>> list(Long appId, int index, int pageSize) {
|
||||
|
||||
Page<InstanceLogDO> page = instanceService.listInstance(appId, index, pageSize);
|
||||
List<InstanceLogVO> content = page.getContent().stream().map(instanceLogDO -> {
|
||||
InstanceLogVO instanceLogVO = new InstanceLogVO();
|
||||
BeanUtils.copyProperties(instanceLogDO, instanceLogVO);
|
||||
instanceLogVO.setStatus(InstanceStatus.of(instanceLogDO.getStatus()).getDes());
|
||||
return instanceLogVO;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
PageResult<InstanceLogVO> pageResult = new PageResult<>(page);
|
||||
pageResult.setData(content);
|
||||
return ResultDTO.success(pageResult);
|
||||
}
|
||||
}
|
||||
|
@ -6,8 +6,9 @@ import com.github.kfcfans.common.ProcessorType;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.common.constans.JobStatus;
|
||||
import com.github.kfcfans.oms.server.common.utils.CronExpression;
|
||||
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.PageResult;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
@ -15,8 +16,12 @@ import com.github.kfcfans.oms.server.service.DispatchService;
|
||||
import com.github.kfcfans.oms.server.service.IdGenerateService;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
|
||||
import com.github.kfcfans.oms.server.web.response.JobInfoVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
@ -27,6 +32,7 @@ import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 任务信息管理 Controller
|
||||
@ -47,7 +53,7 @@ public class JobController {
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private ExecuteLogRepository executeLogRepository;
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
|
||||
@PostMapping("/save")
|
||||
public ResultDTO<Void> saveJobInfo(ModifyJobInfoRequest request) throws Exception {
|
||||
@ -104,11 +110,28 @@ public class JobController {
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@GetMapping("/list")
|
||||
public ResultDTO<PageResult<JobInfoVO>> listJobs(Long appId, int index, int pageSize) {
|
||||
|
||||
Sort sort = Sort.by(Sort.Direction.DESC, "gmtCreate");
|
||||
PageRequest pageRequest = PageRequest.of(index, pageSize, sort);
|
||||
Page<JobInfoDO> jobInfoPage = jobInfoRepository.findByAppId(appId, pageRequest);
|
||||
List<JobInfoVO> jobInfoVOList = jobInfoPage.getContent().stream().map(jobInfoDO -> {
|
||||
JobInfoVO jobInfoVO = new JobInfoVO();
|
||||
BeanUtils.copyProperties(jobInfoDO, jobInfoVO);
|
||||
return jobInfoVO;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
PageResult<JobInfoVO> pageResult = new PageResult<>(jobInfoPage);
|
||||
pageResult.setData(jobInfoVOList);
|
||||
return ResultDTO.success(pageResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* 立即运行JOB
|
||||
*/
|
||||
private void runJobImmediately(JobInfoDO jobInfoDO) {
|
||||
ExecuteLogDO executeLog = new ExecuteLogDO();
|
||||
InstanceLogDO executeLog = new InstanceLogDO();
|
||||
executeLog.setJobId(jobInfoDO.getId());
|
||||
executeLog.setAppId(jobInfoDO.getAppId());
|
||||
executeLog.setInstanceId(IdGenerateService.allocate());
|
||||
@ -117,7 +140,7 @@ public class JobController {
|
||||
executeLog.setGmtCreate(new Date());
|
||||
executeLog.setGmtModified(executeLog.getGmtCreate());
|
||||
|
||||
executeLogRepository.saveAndFlush(executeLog);
|
||||
instanceLogRepository.saveAndFlush(executeLog);
|
||||
dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0);
|
||||
}
|
||||
|
||||
@ -141,7 +164,7 @@ public class JobController {
|
||||
if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) {
|
||||
return;
|
||||
}
|
||||
List<ExecuteLogDO> executeLogs = executeLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus);
|
||||
List<InstanceLogDO> executeLogs = instanceLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus);
|
||||
if (CollectionUtils.isEmpty(executeLogs)) {
|
||||
return;
|
||||
}
|
||||
|
@ -0,0 +1,58 @@
|
||||
package com.github.kfcfans.oms.server.web.controller;
|
||||
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.UserInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.UserInfoRepository;
|
||||
import com.github.kfcfans.oms.server.web.request.ModifyUserInfoRequest;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 用户信息控制层
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/12
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/user")
|
||||
public class UserInfoController {
|
||||
|
||||
@Resource
|
||||
private UserInfoRepository userInfoRepository;
|
||||
|
||||
@PostMapping("save")
|
||||
public ResultDTO<Void> save(ModifyUserInfoRequest request) {
|
||||
UserInfoDO userInfoDO = new UserInfoDO();
|
||||
BeanUtils.copyProperties(request, userInfoDO);
|
||||
userInfoDO.setGmtCreate(new Date());
|
||||
userInfoDO.setGmtModified(userInfoDO.getGmtCreate());
|
||||
userInfoRepository.saveAndFlush(userInfoDO);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@GetMapping("list")
|
||||
public ResultDTO<List<UserItemVO>> list() {
|
||||
List<UserItemVO> result = userInfoRepository.findAll().stream().map(x -> new UserItemVO(x.getId(), x.getUsername())).collect(Collectors.toList());
|
||||
return ResultDTO.success(result);
|
||||
}
|
||||
|
||||
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static final class UserItemVO {
|
||||
private Long id;
|
||||
private String username;
|
||||
}
|
||||
}
|
@ -56,6 +56,14 @@ public class ModifyJobInfoRequest {
|
||||
private Integer instanceRetryNum;
|
||||
private Integer taskRetryNum;
|
||||
|
||||
/* ************************** 繁忙机器配置 ************************** */
|
||||
// 最低CPU核心数量,0代表不限
|
||||
private double minCpuCores;
|
||||
// 最低内存空间,单位 GB,0代表不限
|
||||
private double minMemorySpace;
|
||||
// 最低磁盘空间,单位 GB,0代表不限
|
||||
private double minDiskSpace;
|
||||
|
||||
// 1 正常运行,2 停止(不再调度)
|
||||
private Integer status;
|
||||
}
|
||||
|
@ -0,0 +1,23 @@
|
||||
package com.github.kfcfans.oms.server.web.request;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 创建/修改 UserInfo 请求
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/12
|
||||
*/
|
||||
@Data
|
||||
public class ModifyUserInfoRequest {
|
||||
|
||||
private Long id;
|
||||
|
||||
private String username;
|
||||
private String password;
|
||||
|
||||
// 手机号
|
||||
private String phone;
|
||||
// 邮箱地址
|
||||
private String email;
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package com.github.kfcfans.oms.server.web.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* ExecuteLog 对外展示对象
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/12
|
||||
*/
|
||||
@Data
|
||||
public class InstanceLogVO {
|
||||
|
||||
// 任务ID
|
||||
private Long jobId;
|
||||
// 任务所属应用的ID,冗余提高查询效率
|
||||
private Long appId;
|
||||
// 任务实例ID
|
||||
private Long instanceId;
|
||||
|
||||
// 执行结果
|
||||
private String result;
|
||||
// 预计触发时间
|
||||
private Long expectedTriggerTime;
|
||||
// 实际触发时间
|
||||
private Long actualTriggerTime;
|
||||
// 结束时间
|
||||
private Long finishedTime;
|
||||
// TaskTracker地址
|
||||
private String taskTrackerAddress;
|
||||
|
||||
// 总共执行的次数(用于重试判断)
|
||||
private Long runningTimes;
|
||||
|
||||
private Date gmtCreate;
|
||||
private Date gmtModified;
|
||||
|
||||
/* ********** 不一致区域 ********** */
|
||||
private String status;
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
package com.github.kfcfans.oms.server.web.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* JobInfo 对外展示对象
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/12
|
||||
*/
|
||||
@Data
|
||||
public class JobInfoVO {
|
||||
|
||||
private Long id;
|
||||
|
||||
/* ************************** 任务基本信息 ************************** */
|
||||
// 任务名称
|
||||
private String jobName;
|
||||
// 任务描述
|
||||
private String jobDescription;
|
||||
// 任务所属的应用ID
|
||||
private Long appId;
|
||||
// 任务自带的参数
|
||||
private String jobParams;
|
||||
// 任务实例的参数(API触发专用)
|
||||
private String instanceParams;
|
||||
|
||||
/* ************************** 定时参数 ************************** */
|
||||
// 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
|
||||
private Integer timeExpressionType;
|
||||
// 时间表达式,CRON/NULL/LONG/LONG
|
||||
private String timeExpression;
|
||||
|
||||
/* ************************** 执行方式 ************************** */
|
||||
// 执行类型,单机/广播/MR
|
||||
private Integer executeType;
|
||||
// 执行器类型,Java/Shell
|
||||
private Integer processorType;
|
||||
// 执行器信息
|
||||
private String processorInfo;
|
||||
|
||||
/* ************************** 运行时配置 ************************** */
|
||||
// 最大同时运行任务数,默认 1
|
||||
private Integer maxInstanceNum;
|
||||
// 并发度,同时执行某个任务的最大线程数量
|
||||
private Integer concurrency;
|
||||
// 任务整体超时时间
|
||||
private Long instanceTimeLimit;
|
||||
|
||||
/* ************************** 重试配置 ************************** */
|
||||
private Integer instanceRetryNum;
|
||||
private Integer taskRetryNum;
|
||||
|
||||
// 1 正常运行,2 停止(不再调度)
|
||||
private Integer status;
|
||||
// 下一次调度时间
|
||||
private Long nextTriggerTime;
|
||||
|
||||
/* ************************** 繁忙机器配置 ************************** */
|
||||
// 最低CPU核心数量,0代表不限
|
||||
private double minCpuCores;
|
||||
// 最低内存空间,单位 GB,0代表不限
|
||||
private double minMemorySpace;
|
||||
// 最低磁盘空间,单位 GB,0代表不限
|
||||
private double minDiskSpace;
|
||||
|
||||
private Date gmtCreate;
|
||||
private Date gmtModified;
|
||||
}
|
@ -3,10 +3,10 @@ package com.github.kfcfans.oms.server.test;
|
||||
import com.github.kfcfans.common.utils.NetUtils;
|
||||
import com.github.kfcfans.oms.server.common.constans.JobStatus;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
|
||||
import org.assertj.core.util.Lists;
|
||||
@ -33,7 +33,7 @@ public class RepositoryTest {
|
||||
@Resource
|
||||
private OmsLockRepository omsLockRepository;
|
||||
@Resource
|
||||
private ExecuteLogRepository executeLogRepository;
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
|
||||
/**
|
||||
* 需要证明批量写入失败后会回滚
|
||||
@ -58,17 +58,17 @@ public class RepositoryTest {
|
||||
|
||||
@Test
|
||||
public void testUpdate() {
|
||||
ExecuteLogDO updateEntity = new ExecuteLogDO();
|
||||
InstanceLogDO updateEntity = new InstanceLogDO();
|
||||
updateEntity.setId(22L);
|
||||
updateEntity.setActualTriggerTime(System.currentTimeMillis());
|
||||
updateEntity.setResult("hahaha");
|
||||
executeLogRepository.saveAndFlush(updateEntity);
|
||||
instanceLogRepository.saveAndFlush(updateEntity);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteLogUpdate() {
|
||||
executeLogRepository.update4Trigger(1586310414570L, 2, 100, System.currentTimeMillis(), "192.168.1.1", "NULL");
|
||||
executeLogRepository.update4FrequentJob(1586310419650L, 2, 200);
|
||||
instanceLogRepository.update4Trigger(1586310414570L, 2, 100, System.currentTimeMillis(), "192.168.1.1", "NULL");
|
||||
instanceLogRepository.update4FrequentJob(1586310419650L, 2, 200);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package com.github.kfcfans.oms.worker.actors;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import com.github.kfcfans.common.model.InstanceDetail;
|
||||
import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq;
|
||||
import com.github.kfcfans.common.request.ServerScheduleJobReq;
|
||||
import com.github.kfcfans.common.request.ServerStopInstanceReq;
|
||||
@ -150,10 +151,17 @@ public class TaskTrackerActor extends AbstractActor {
|
||||
* 查询任务实例运行状态
|
||||
*/
|
||||
private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) {
|
||||
AskResponse askResponse = new AskResponse();
|
||||
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
|
||||
if (taskTracker == null) {
|
||||
log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req);
|
||||
return;
|
||||
askResponse.setSuccess(false);
|
||||
askResponse.setExtra("can't find TaskTracker");
|
||||
}else {
|
||||
InstanceDetail instanceDetail = taskTracker.fetchRunningStatus();
|
||||
askResponse.setSuccess(true);
|
||||
askResponse.setExtra(instanceDetail);
|
||||
}
|
||||
getSender().tell(askResponse, getSelf());
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import akka.pattern.Patterns;
|
||||
import com.github.kfcfans.common.ExecuteType;
|
||||
import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.SystemInstanceResult;
|
||||
import com.github.kfcfans.common.model.InstanceDetail;
|
||||
import com.github.kfcfans.common.request.ServerScheduleJobReq;
|
||||
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
@ -57,6 +58,26 @@ public class CommonTaskTracker extends TaskTracker {
|
||||
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InstanceDetail fetchRunningStatus() {
|
||||
|
||||
InstanceDetail detail = new InstanceDetail();
|
||||
// 填充基础信息
|
||||
detail.setActualTriggerTime(createTime);
|
||||
detail.setStatus(InstanceStatus.RUNNING.getDes());
|
||||
detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress());
|
||||
|
||||
// 填充详细信息
|
||||
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
|
||||
InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();
|
||||
taskDetail.setSucceedTaskNum(holder.succeedNum);
|
||||
taskDetail.setFailedTaskNum(holder.failedNum);
|
||||
taskDetail.setTotalTaskNum(holder.getTotalTaskNum());
|
||||
detail.setExtra(taskDetail);
|
||||
|
||||
return detail;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 任务是否超时
|
||||
|
@ -5,6 +5,7 @@ import com.github.kfcfans.common.ExecuteType;
|
||||
import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.RemoteConstant;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.common.model.InstanceDetail;
|
||||
import com.github.kfcfans.common.request.ServerScheduleJobReq;
|
||||
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
|
||||
import com.github.kfcfans.oms.worker.OhMyWorker;
|
||||
@ -17,10 +18,13 @@ import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -98,6 +102,27 @@ public class FrequentTaskTracker extends TaskTracker {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public InstanceDetail fetchRunningStatus() {
|
||||
InstanceDetail detail = new InstanceDetail();
|
||||
// 填充基础信息
|
||||
detail.setActualTriggerTime(createTime);
|
||||
detail.setStatus(InstanceStatus.RUNNING.getDes());
|
||||
detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress());
|
||||
|
||||
List<InstanceDetail.SubInstanceDetail> history = Lists.newLinkedList();
|
||||
recentSubInstanceInfo.forEach((ignore, subInstanceInfo) -> {
|
||||
InstanceDetail.SubInstanceDetail subDetail = new InstanceDetail.SubInstanceDetail();
|
||||
BeanUtils.copyProperties(subInstanceInfo, subDetail);
|
||||
subDetail.setStatus(InstanceStatus.of(subInstanceInfo.status).getDes());
|
||||
|
||||
history.add(subDetail);
|
||||
});
|
||||
|
||||
detail.setExtra(history);
|
||||
return detail;
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务发射器(@Reference 饥荒->雪球发射器)
|
||||
*/
|
||||
@ -300,6 +325,7 @@ public class FrequentTaskTracker extends TaskTracker {
|
||||
SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId);
|
||||
subInstanceInfo.status = success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV();
|
||||
subInstanceInfo.result = result;
|
||||
subInstanceInfo.finishedTime = System.currentTimeMillis();
|
||||
}
|
||||
// 删除数据库相关数据
|
||||
taskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId);
|
||||
@ -310,9 +336,11 @@ public class FrequentTaskTracker extends TaskTracker {
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class SubInstanceInfo {
|
||||
private int status;
|
||||
private long startTime;
|
||||
private long finishedTime;
|
||||
private String result;
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package com.github.kfcfans.oms.worker.core.tracker.task;
|
||||
import akka.actor.ActorSelection;
|
||||
import com.github.kfcfans.common.RemoteConstant;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.common.model.InstanceDetail;
|
||||
import com.github.kfcfans.common.request.ServerScheduleJobReq;
|
||||
import com.github.kfcfans.common.utils.CommonUtils;
|
||||
import com.github.kfcfans.oms.worker.OhMyWorker;
|
||||
@ -371,6 +372,10 @@ public abstract class TaskTracker {
|
||||
protected long runningNum;
|
||||
protected long failedNum;
|
||||
protected long succeedNum;
|
||||
|
||||
public long getTotalTaskNum() {
|
||||
return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -378,4 +383,10 @@ public abstract class TaskTracker {
|
||||
* @param req 服务器调度任务实例运行请求
|
||||
*/
|
||||
abstract protected void initTaskTracker(ServerScheduleJobReq req);
|
||||
|
||||
/**
|
||||
* 查询任务实例的详细运行状态
|
||||
* @return 任务实例的详细运行状态
|
||||
*/
|
||||
abstract public InstanceDetail fetchRunningStatus();
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ import com.github.kfcfans.oms.worker.OhMyWorker;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* ProcessorTracker 定时向 TaskTracker 上报健康状态
|
||||
*
|
||||
@ -13,7 +15,7 @@ import lombok.NoArgsConstructor;
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ProcessorTrackerStatusReportReq {
|
||||
public class ProcessorTrackerStatusReportReq implements Serializable {
|
||||
|
||||
private Long instanceId;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user