fihished the cron schedule

This commit is contained in:
tjq 2020-04-06 16:36:03 +08:00
parent 84203c5caa
commit f1b3edea62
51 changed files with 2272 additions and 165 deletions

View File

@ -17,4 +17,13 @@ public enum ExecuteType {
MAP_REDUCE(3);
int v;
public static ExecuteType of(int v) {
for (ExecuteType type : values()) {
if (type.v == v) {
return type;
}
}
throw new IllegalArgumentException("unknown ExecuteType of " + v);
}
}

View File

@ -4,7 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* description
* 任务运行状态
*
* @author tjq
* @since 2020/3/17
@ -12,10 +12,14 @@ import lombok.Getter;
@Getter
@AllArgsConstructor
public enum InstanceStatus {
RUNNING(3, "运行中"),
SUCCEED(4, "运行成功"),
FAILED(5, "运行失败");
private int value;
WAITING_DISPATCH(1, "等待任务派发任务处理Server时间轮中"),
WAITING_WORKER_RECEIVE(2, "Server已完成任务派发等待Worker接收"),
RUNNING(3, "Worker接收成功正在运行任务"),
FAILED(4, "任务运行失败"),
SUCCEED(5, "任务运行成功"),
STOPPED(10, "任务被手动停止");
private int v;
private String des;
}

View File

@ -17,4 +17,13 @@ public enum ProcessorType {
private int v;
private String des;
public static ProcessorType of(int v) {
for (ProcessorType type : values()) {
if (type.v == v) {
return type;
}
}
throw new IllegalArgumentException("unknown ProcessorType of " + v);
}
}

View File

@ -32,6 +32,8 @@ public class SystemMetrics implements Serializable, Comparable<SystemMetrics> {
// 缓存分数
private int score;
public static final int MIN_SCORE = 1;
@Override
public int compareTo(SystemMetrics that) {
return this.calculateScore() - that.calculateScore();
@ -53,7 +55,7 @@ public class SystemMetrics implements Serializable, Comparable<SystemMetrics> {
// 最低运行标准1G磁盘 & 0.5G内存 & 一个可用的CPU核心
if (availableDisk < 1 || availableMemory < 0.5 || availableCPUCores < 1) {
score = 1;
score = MIN_SCORE;
} else {
// 磁盘只需要满足最低标准即可
score = (int) (availableMemory * 2 + availableCPUCores);

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.common.request;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 服务端调度任务请求一次任务处理的入口
@ -13,18 +14,16 @@ import java.io.Serializable;
@Data
public class ServerScheduleJobReq implements Serializable {
// 调度的服务器地址默认通讯目标
private String serverAddress;
// 可用处理器地址可能多值逗号分隔
private String allWorkerAddress;
private List<String> allWorkerAddress;
/* *********************** 任务相关属性 *********************** */
/**
* 基础信息
*/
private String jobId;
private String instanceId;
private Long jobId;
private Long instanceId;
/**
* 任务执行处理器信息

View File

@ -13,8 +13,8 @@ import java.io.Serializable;
@Data
public class TaskTrackerReportInstanceStatusReq implements Serializable {
private String jobId;
private String instanceId;
private Long jobId;
private Long instanceId;
private int instanceStatus;

View File

@ -18,4 +18,8 @@ import java.io.Serializable;
public class AskResponse implements Serializable {
private boolean success;
private Object extra;
public AskResponse(boolean success) {
this.success = success;
}
}

View File

@ -0,0 +1,22 @@
package com.github.kfcfans.oms.server.common.constans;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 任务状态
*
* @author tjq
* @since 2020/4/6
*/
@Getter
@AllArgsConstructor
public enum JobStatus {
ENABLE(1),
STOPPED(2),
DELETED(99);
private int v;
}

View File

@ -0,0 +1,93 @@
package com.github.kfcfans.oms.server.common.utils.snowflake;
/**
* Twitter SnowFlakeScala -> Java
*
* @author tjq
* @since 2020/4/6
*/
public class SnowFlakeIdGenerator {
/**
* 起始的时间戳
*/
private final static long START_STAMP = 1480166465631L;
/**
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; //序列号占用的位数
private final static long MACHINE_BIT = 5; //机器标识占用的位数
private final static long DATA_CENTER_BIT = 5;//数据中心占用的位数
/**
* 每一部分的最大值
*/
private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
private long dataCenterId; //数据中心
private long machineId; //机器标识
private long sequence = 0L; //序列号
private long lastTimestamp = -1L;//上一次时间戳
public SnowFlakeIdGenerator(long dataCenterId, long machineId) {
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.dataCenterId = dataCenterId;
this.machineId = machineId;
}
/**
* 产生下一个ID
*/
public synchronized long nextId() {
long currStamp = getNewStamp();
if (currStamp < lastTimestamp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id");
}
if (currStamp == lastTimestamp) {
//相同毫秒内序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currStamp = getNextMill();
}
} else {
//不同毫秒内序列号置为0
sequence = 0L;
}
lastTimestamp = currStamp;
return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
| dataCenterId << DATA_CENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
private long getNextMill() {
long mill = getNewStamp();
while (mill <= lastTimestamp) {
mill = getNewStamp();
}
return mill;
}
private long getNewStamp() {
return System.currentTimeMillis();
}
}

View File

@ -26,6 +26,8 @@ public class OhMyServer {
@Getter
private static String actorSystemAddress;
private static final String AKKA_PATH = "akka://%s@%s/user/%s";
public static void init() {
// 1. 启动 ActorSystem
@ -51,7 +53,12 @@ public class OhMyServer {
* @return ActorSelection
*/
public static ActorSelection getServerActor(String address) {
String path = String.format("akka://%s@%s/user/%s", RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME);
String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
public static ActorSelection getTaskTrackerActor(String address) {
String path = String.format(AKKA_PATH, RemoteConstant.ACTOR_SYSTEM_NAME, address, RemoteConstant.Task_TRACKER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.core.akka;
import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
@ -35,7 +36,19 @@ public class ServerActor extends AbstractActor {
getSender().tell(askResponse, getSelf());
}
/**
* 处理 Worker 的心跳请求
* @param heartbeat 心跳包
*/
private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
WorkerManagerService.updateStatus(heartbeat);
}
/**
* 处理 instance 状态
* @param req 任务实例的状态上报请求
*/
private void onReceive(TaskTrackerReportInstanceStatusReq req) {
}
}

View File

@ -23,6 +23,7 @@ public class AppInfoDO {
private String appName;
private String description;
// 当前负责该 appName 旗下任务调度的server地址IP:Port
private String currentServer;
private Date gmtCreate;

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.oms.server.persistence.model;
import lombok.Data;
import javax.persistence.*;
import java.util.Date;
@ -9,9 +11,10 @@ import java.util.Date;
* @author tjq
* @since 2020/3/30
*/
@Data
@Entity
@Table(name = "job_log")
public class JobLogDO {
@Table(name = "execute_log", indexes = {@Index(columnList = "jobId")})
public class ExecuteLogDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@ -20,13 +23,19 @@ public class JobLogDO {
// 任务ID
private Long jobId;
// 任务实例ID
private String instanceId;
// 任务状态 运行中/成功/失败...
private Long instanceId;
/**
* 任务状态 {@link com.github.kfcfans.common.InstanceStatus}
*/
private int status;
// 执行结果
private String result;
// 耗时
private Long usedTime;
// 预计触发时间
private Long expectedTriggerTime;
// 实际触发时间
private Long actualTriggerTime;
private Date gmtCreate;
private Date gmtModified;

View File

@ -14,7 +14,7 @@ import java.util.Date;
*/
@Data
@Entity
@Table(name = "job_info")
@Table(name = "job_info", indexes = {@Index(columnList = "appId")})
public class JobInfoDO {
@ -29,6 +29,8 @@ public class JobInfoDO {
private String jobDescription;
// 任务所属的应用ID
private Long appId;
// 任务自带的参数
private String jobParams;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
@ -45,7 +47,7 @@ public class JobInfoDO {
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数
// 最大同时运行任务数默认 1
private Integer maxInstanceNum;
// 并发度同时执行某个任务的最大线程数量
private Integer concurrency;
@ -54,6 +56,10 @@ public class JobInfoDO {
// 任务的每一个Task超时时间
private Long taskTimeLimit;
/* ************************** 重试配置 ************************** */
private Integer instanceRetryNum;
private Integer taskRetryNum;
// 1 正常运行2 停止不再调度
private Integer status;
// 下一次调度时间

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
/**
* AppInfo 数据访问层
*
@ -12,4 +14,10 @@ import org.springframework.data.jpa.repository.JpaRepository;
public interface AppInfoRepository extends JpaRepository<AppInfoDO, Long> {
AppInfoDO findByAppName(String appName);
/**
* 根据 currentServer 查询 appId
* 其实只需要 id处于性能考虑可以直接写SQL只返回ID
*/
List<AppInfoDO> findAllByCurrentServer(String currentServer);
}

View File

@ -0,0 +1,23 @@
package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import java.util.List;
/**
* JobLog 数据访问层
*
* @author tjq
* @since 2020/4/1
*/
public interface ExecuteLogRepository extends JpaRepository<ExecuteLogDO, Long> {
long countByJobIdAndStatusIn(long jobId, List<Integer> status);
@Query(value = "update execute_log set status = ?2, result = ?3 where instance_id = ?1", nativeQuery = true)
int updateStatusAndLog(long instanceId, int status, String result);
List<ExecuteLogDO> findByJobIdIn(List<Long> jobIds);
}

View File

@ -14,7 +14,7 @@ import java.util.List;
public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
List<JobInfoDO> findByAppIdInAndNextTriggerTimeLessThanEqual(List<Long> appIds, Long time);
List<JobInfoDO> findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time);
List<JobInfoDO> findByAppIdAndNextTriggerTimeLessThanEqual(Long appId, Long time);
List<JobInfoDO> findByAppIdInAndStatusAndTimeExpression(List<Long> appIds, int status, int timeExpressionType);
}

View File

@ -1,16 +0,0 @@
package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.JobLogDO;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* JobLog 数据访问层
*
* @author tjq
* @since 2020/4/1
*/
public interface JobLogRepository extends JpaRepository<JobLogDO, Long> {
long countByJobIdAndStatus(Long jobId, Integer status);
}

View File

@ -1,10 +1,23 @@
package com.github.kfcfans.oms.server.service;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import static com.github.kfcfans.common.InstanceStatus.*;
/**
* 派送服务
@ -12,16 +25,72 @@ import javax.annotation.Resource;
* @author tjq
* @since 2020/4/5
*/
@Slf4j
@Service
public class DispatchService {
@Resource
private JobLogRepository jobLogRepository;
private ExecuteLogRepository executeLogRepository;
public void dispatch(JobInfoDO jobInfo) {
// 前三个状态都视为运行中
private static final List<Integer> runningStatus = Lists.newArrayList(WAITING_DISPATCH.getV(), WAITING_WORKER_RECEIVE.getV(), RUNNING.getV());
// 1. 查询当前运行的实例数
private static final String FAILED_REASON = "%d instance is running";
private static final String NO_WORKER_REASON = "no worker available";
private static final String EMPTY_RESULT = "";
public void dispatch(JobInfoDO jobInfo, long instanceId) {
log.debug("[DispatchService] start to dispatch job -> {}.", jobInfo);
// 查询当前运行的实例数
long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobInfo.getId(), runningStatus);
// 超出最大同时运行限制不执行调度
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(FAILED_REASON, runningInstanceCount);
log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount);
executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), result);
return;
}
// 获取 Worker
String taskTrackerAddress = WorkerManagerService.chooseBestWorker(jobInfo.getAppId());
List<String> allAvailableWorker = WorkerManagerService.getAllAvailableWorker(jobInfo.getAppId());
if (StringUtils.isEmpty(taskTrackerAddress)) {
log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo);
executeLogRepository.updateStatusAndLog(instanceId, FAILED.getV(), NO_WORKER_REASON);
return;
}
// 消除非原子操作带来的潜在不一致
allAvailableWorker.remove(taskTrackerAddress);
allAvailableWorker.add(taskTrackerAddress);
// 构造请求
ServerScheduleJobReq req = new ServerScheduleJobReq();
req.setAllWorkerAddress(allAvailableWorker);
req.setJobId(jobInfo.getId());
req.setInstanceId(instanceId);
req.setExecuteType(ExecuteType.of(jobInfo.getExecuteType()).name());
req.setProcessorType(ProcessorType.of(jobInfo.getProcessorType()).name());
req.setProcessorInfo(jobInfo.getProcessorInfo());
req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit());
req.setTaskTimeoutMS(jobInfo.getTaskTimeLimit());
req.setJobParams(jobInfo.getJobParams());
req.setThreadConcurrency(jobInfo.getConcurrency());
req.setTaskRetryNum(jobInfo.getTaskRetryNum());
// 发送请求不可靠需要一个后台线程定期轮询状态
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress);
taskTrackerActor.tell(req, null);
// 修改状态
executeLogRepository.updateStatusAndLog(instanceId, WAITING_WORKER_RECEIVE.getV(), EMPTY_RESULT);
}
}

View File

@ -0,0 +1,16 @@
package com.github.kfcfans.oms.server.service;
/**
* 唯一ID生成服务
*
* @author tjq
* @since 2020/4/6
*/
public class IdGenerateService {
public static Long allocate() {
// TODO换成合适的分布式ID生成算法
return System.currentTimeMillis();
}
}

View File

@ -4,10 +4,12 @@ import com.github.kfcfans.common.model.SystemMetrics;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 管理Worker集群状态
@ -61,14 +63,40 @@ public class ClusterStatusHolder {
entryList.sort((o1, o2) -> o2.getValue().calculateScore() - o1.getValue().calculateScore());
for (Map.Entry<String, SystemMetrics> entry : address2Metrics.entrySet()) {
long lastActiveTime = address2ActiveTime.getOrDefault(entry.getKey(), -1L);
long timeout = System.currentTimeMillis() - lastActiveTime;
if (timeout < WORKER_TIMEOUT_MS) {
return entry.getKey();
String address = entry.getKey();
if (available(address)) {
return address;
}
}
log.warn("[ClusterStatusHolder] no worker available for {}, worker status is {}.", appName, address2Metrics);
return null;
}
/**
* 获取当前所有可用的 Worker
* @return List<Worker>
*/
public List<String> getAllAvailableWorker() {
List<String> workers = Lists.newLinkedList();
address2Metrics.forEach((address, ignore) -> {
if (available(address)) {
workers.add(address);
}
});
return workers;
}
private boolean available(String address) {
SystemMetrics metrics = address2Metrics.get(address);
if (metrics.calculateScore() == SystemMetrics.MIN_SCORE) {
return false;
}
Long lastActiveTime = address2ActiveTime.getOrDefault(address, -1L);
long timeout = System.currentTimeMillis() - lastActiveTime;
return timeout < WORKER_TIMEOUT_MS;
}
}

View File

@ -72,10 +72,7 @@ public class ServerSelectService {
try {
// 可能上一台机器已经完成了Server选举需要再次判断
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> {
log.error("[ServerSelectService] impossible, unless we just lost our database.");
return null;
});
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
if (isActive(appInfo.getCurrentServer())) {
return appInfo.getCurrentServer();
}

View File

@ -3,10 +3,10 @@ package com.github.kfcfans.oms.server.service.ha;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* Worker 管理服务
@ -17,7 +17,8 @@ import java.util.Map;
@Slf4j
public class WorkerManagerService {
private static final Map<Long, ClusterStatusHolder> appName2ClusterStatus = Maps.newConcurrentMap();
// 存储Worker健康信息appId -> ClusterStatusHolder
private static final Map<Long, ClusterStatusHolder> appId2ClusterStatus = Maps.newConcurrentMap();
/**
* 更新状态
@ -26,7 +27,7 @@ public class WorkerManagerService {
public static void updateStatus(WorkerHeartbeat heartbeat) {
Long appId = heartbeat.getAppId();
String appName = heartbeat.getAppName();
ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
clusterStatusHolder.updateStatus(heartbeat);
}
@ -36,7 +37,7 @@ public class WorkerManagerService {
* @return Worker的地址null代表没有可用的Worker
*/
public static String chooseBestWorker(Long appId) {
ClusterStatusHolder clusterStatusHolder = appName2ClusterStatus.get(appId);
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId);
return null;
@ -45,11 +46,24 @@ public class WorkerManagerService {
}
/**
* 获取当前该 Server 管理的所有应用ID
* @return List<AppId>
* 获取当前所有可用的Worker地址
*/
public static List<Long> listAppIds() {
return Lists.newArrayList(appName2ClusterStatus.keySet());
public static List<String> getAllAvailableWorker(Long appId) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
log.warn("[WorkerManagerService] can't find any worker for {} yet.", appId);
return Collections.emptyList();
}
return clusterStatusHolder.getAllAvailableWorker();
}
/**
* 清理不需要的worker信息
* @param usingAppIds 需要维护的appId其余的数据将被删除
*/
public static void clean(List<Long> usingAppIds) {
Set<Long> keys = Sets.newHashSet(usingAppIds);
appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
}
}

View File

@ -1,19 +1,34 @@
package com.github.kfcfans.oms.server.service.schedule;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.core.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.JobLogRepository;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import com.github.kfcfans.oms.server.service.DispatchService;
import com.github.kfcfans.oms.server.service.IdGenerateService;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.github.kfcfans.oms.server.service.lock.LockService;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@ -29,46 +44,163 @@ public class JobScheduleService {
private static final int MAX_BATCH_NUM = 10;
@Resource
private LockService lockService;
private DispatchService dispatchService;
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private JobLogRepository jobLogRepository;
private ExecuteLogRepository executeLogRepository;
private static final String SCHEDULE_LOCK = "schedule_lock_%d";
private static final long SCHEDULE_RATE = 10000;
private static final long SCHEDULE_RATE = 5000;
@Scheduled(fixedRate = SCHEDULE_RATE)
private void getJob() {
List<Long> allAppIds = WorkerManagerService.listAppIds();
if (CollectionUtils.isEmpty(allAppIds)) {
public void timingSchedule() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 先查询DB查看本机需要负责的任务
List<AppInfoDO> allAppInfos = appInfoRepository.findAllByCurrentServer(OhMyServer.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppInfos)) {
log.info("[JobScheduleService] current server has no app's job to schedule.");
return;
}
List<Long> allAppIds = allAppInfos.stream().map(AppInfoDO::getId).collect(Collectors.toList());
long timeThreshold = System.currentTimeMillis() + 2 * SCHEDULE_RATE;
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
List<String> lockNames = partAppIds.stream().map(JobScheduleService::genLock).collect(Collectors.toList());
// 1. 先批量获取锁获取不到就改成单个循环模式
boolean batchLock = lockService.batchLock(lockNames);
if (!batchLock) {
}else {
try {
List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndNextTriggerTimeLessThanEqual(partAppIds, timeThreshold);
scheduleCornJob(allAppIds);
}catch (Exception e) {
log.error("[JobScheduleService] schedule cron job failed.", e);
}
log.info("[JobScheduleService] finished cron schedule, using time {}.", stopwatch);
stopwatch.reset();
// 顺序先推入进时间轮 -> 写jobLog表 -> 更新nextTriggerTime原则宁可重复执行也不能不调度
try {
scheduleFrequentJob(allAppIds);
}catch (Exception e) {
log.error("[JobScheduleService] schedule frequent job failed.", e);
}
log.info("[JobScheduleService] finished frequent schedule, using time {}.", stopwatch);
stopwatch.stop();
}
/**
* 调度 CRON 表达式类型的任务
*/
private void scheduleCornJob(List<Long> appIds) {
// 清理不需要维护的数据
WorkerManagerService.clean(appIds);
long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
Lists.partition(appIds, MAX_BATCH_NUM).forEach(partAppIds -> {
try {
// 查询条件任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行
List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold);
// 1. 批量写日志表
Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
List<ExecuteLogDO> executeLogs = Lists.newLinkedList();
jobInfos.forEach(jobInfoDO -> {
ExecuteLogDO executeLog = new ExecuteLogDO();
executeLog.setJobId(jobInfoDO.getId());
executeLog.setInstanceId(IdGenerateService.allocate());
executeLog.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
executeLog.setExpectedTriggerTime(jobInfoDO.getNextTriggerTime());
executeLog.setGmtCreate(new Date());
executeLog.setGmtModified(executeLog.getGmtCreate());
executeLogs.add(executeLog);
jobId2InstanceId.put(executeLog.getJobId(), executeLog.getInstanceId());
});
executeLogRepository.saveAll(executeLogs);
executeLogRepository.flush();
// 2. 推入时间轮中等待调度执行
jobInfos.forEach(jobInfoDO -> {
long targetTriggerTime = jobInfoDO.getNextTriggerTime();
long delay = 0;
if (targetTriggerTime < nowTime) {
log.warn("[JobScheduleService] Job({}) was delayed.", jobInfoDO);
}else {
delay = targetTriggerTime - nowTime;
}
HashedWheelTimerHolder.TIMER.schedule(() -> {
dispatchService.dispatch(jobInfoDO, jobId2InstanceId.get(jobInfoDO.getId()));
}, delay, TimeUnit.MILLISECONDS);
});
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms
Date now = new Date();
List<JobInfoDO> updatedJobInfos = Lists.newLinkedList();
jobInfos.forEach(jobInfoDO -> {
try {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
Date nextTriggerTime = cronExpression.getNextValidTimeAfter(now);
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
updatedJobInfo.setNextTriggerTime(nextTriggerTime.getTime());
updatedJobInfo.setGmtModified(now);
updatedJobInfos.add(updatedJobInfo);
} catch (Exception e) {
log.error("[JobScheduleService] calculate next trigger time for job({}) failed.", jobInfoDO, e);
}
});
jobInfoRepository.saveAll(updatedJobInfos);
jobInfoRepository.flush();
}catch (Exception e) {
}
log.error("[JobScheduleService] schedule job failed.", e);
}
});
}
private static String genLock(Long appId) {
return String.format(SCHEDULE_LOCK, appId);
/**
* 调度 FIX_RATE FIX_DELAY 的任务
*/
private void scheduleFrequentJob(List<Long> appIds) {
List<JobInfoDO> fixDelayJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_DELAY.getV());
List<JobInfoDO> fixRateJobs = jobInfoRepository.findByAppIdInAndStatusAndTimeExpression(appIds, JobStatus.ENABLE.getV(), TimeExpressionType.FIX_RATE.getV());
List<Long> jobIds = Lists.newLinkedList();
Map<Long, JobInfoDO> jobId2JobInfo = Maps.newHashMap();
Consumer<JobInfoDO> consumer = jobInfo -> {
jobIds.add(jobInfo.getId());
jobId2JobInfo.put(jobInfo.getId(), jobInfo);
};
fixDelayJobs.forEach(consumer);
fixRateJobs.forEach(consumer);
if (CollectionUtils.isEmpty(jobIds)) {
log.debug("[JobScheduleService] no frequent job need to schedule.");
return;
}
// 查询 ExecuteLog 不存在或非运行状态则重新调度
List<ExecuteLogDO> executeLogDOS = executeLogRepository.findByJobIdIn(jobIds);
executeLogDOS.forEach(executeLogDO -> {
if (executeLogDO.getStatus() == InstanceStatus.RUNNING.getV()) {
jobId2JobInfo.remove(executeLogDO.getJobId());
}
});
// 重新Dispatch
jobId2JobInfo.values().forEach(jobInfoDO -> {
});
}
}

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.common.utils.CronExpression;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.web.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
@ -14,6 +15,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
/**
* 任务信息管理 Controller
@ -29,15 +31,22 @@ public class JobController {
private JobInfoRepository jobInfoRepository;
@PostMapping("/save")
public ResultDTO<Void> saveJobInfo(ModifyJobInfoRequest request) {
public ResultDTO<Void> saveJobInfo(ModifyJobInfoRequest request) throws Exception {
JobInfoDO jobInfoDO = new JobInfoDO();
BeanUtils.copyProperties(request, jobInfoDO);
// 拷贝枚举值
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(request.getTimeExpression());
jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV());
jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV());
jobInfoDO.setTimeExpressionType(TimeExpressionType.valueOf(request.getTimeExpression()).getV());
jobInfoDO.setTimeExpressionType(timeExpressionType.getV());
// 计算下次调度时间
if (timeExpressionType == TimeExpressionType.CRON) {
CronExpression cronExpression = new CronExpression(request.getTimeExpression());
Date nextValidTime = cronExpression.getNextValidTimeAfter(new Date());
jobInfoDO.setNextTriggerTime(nextValidTime.getTime());
}
jobInfoRepository.saveAndFlush(jobInfoDO);
return ResultDTO.success(null);

View File

@ -11,6 +11,8 @@ import lombok.Data;
@Data
public class ModifyJobInfoRequest {
// null -> 插入否则为更新
private Long id;
/* ************************** 任务基本信息 ************************** */
// 任务名称
private String jobName;
@ -20,6 +22,8 @@ public class ModifyJobInfoRequest {
private Long appId;
// 任务分组名称仅用于前端展示的分组
private String groupName;
// 任务自带的参数
private String jobParams;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
@ -36,6 +40,7 @@ public class ModifyJobInfoRequest {
// 执行器信息
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数
private Integer maxInstanceNum;
@ -45,4 +50,11 @@ public class ModifyJobInfoRequest {
private Long instanceTimeLimit;
// 任务的每一个Task超时时间
private Long taskTimeLimit;
/* ************************** 重试配置 ************************** */
private Integer instanceRetryNum;
private Integer taskRetryNum;
// 1 正常运行2 停止不再调度
private Integer status;
}

View File

@ -61,4 +61,9 @@ public class UtilsTest {
Thread.sleep(277777777);
}
@Test
public void testCronExpression() {
}
}

View File

@ -30,8 +30,8 @@ public class ProcessorTrackerActor extends AbstractActor {
* 处理来自TaskTracker的task执行请求
*/
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
String jobId = req.getInstanceInfo().getJobId();
String instanceId = req.getInstanceInfo().getInstanceId();
Long jobId = req.getInstanceInfo().getJobId();
Long instanceId = req.getInstanceInfo().getInstanceId();
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> {
ProcessorTracker pt = new ProcessorTracker(req);
log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId);
@ -50,7 +50,7 @@ public class ProcessorTrackerActor extends AbstractActor {
private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) {
String instanceId = req.getInstanceId();
Long instanceId = req.getInstanceId();
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId);
if (processorTracker == null) {
log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId);

View File

@ -126,7 +126,7 @@ public class TaskTrackerActor extends AbstractActor {
* 服务器任务调度处理器
*/
private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
String instanceId = req.getInstanceId();
Long instanceId = req.getInstanceId();
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(instanceId);
if (taskTracker != null) {

View File

@ -1,13 +0,0 @@
package com.github.kfcfans.oms.worker.common.constants;
import com.google.common.base.Splitter;
/**
* splitter & joiner
*
* @author tjq
* @since 2020/3/17
*/
public class CommonSJ {
public static final Splitter commaSplitter = Splitter.on(",");
}

View File

@ -44,7 +44,7 @@ public class ProcessorRunnable implements Runnable {
public void innerRun() {
String taskId = task.getTaskId();
String instanceId = task.getInstanceId();
Long instanceId = task.getInstanceId();
log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());

View File

@ -1,6 +1,5 @@
package com.github.kfcfans.oms.worker.core.ha;
import com.github.kfcfans.oms.worker.common.constants.CommonSJ;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -18,12 +17,10 @@ public class ProcessorTrackerStatusHolder {
private final Map<String, ProcessorTrackerStatus> ip2Status;
public ProcessorTrackerStatusHolder(String allWorkerAddress) {
public ProcessorTrackerStatusHolder(List<String> allWorkerAddress) {
ip2Status = Maps.newConcurrentMap();
List<String> addressList = CommonSJ.commaSplitter.splitToList(allWorkerAddress);
addressList.forEach(ip -> {
allWorkerAddress.forEach(ip -> {
ProcessorTrackerStatus pts = new ProcessorTrackerStatus();
pts.init(ip);
ip2Status.put(ip, pts);

View File

@ -33,7 +33,7 @@ public class ProcessorTracker {
// 任务实例信息
private InstanceInfo instanceInfo;
// 冗余 instanceId方便日志
private String instanceId;
private Long instanceId;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;

View File

@ -14,23 +14,23 @@ import java.util.function.Function;
*/
public class ProcessorTrackerPool {
private static final Map<String, ProcessorTracker> instanceId2ProcessorTracker = Maps.newConcurrentMap();
private static final Map<Long, ProcessorTracker> instanceId2ProcessorTracker = Maps.newConcurrentMap();
/**
* 获取 ProcessorTracker如果不存在则创建
*/
public static ProcessorTracker getProcessorTracker(String instanceId, Function<String, ProcessorTracker> creator) {
public static ProcessorTracker getProcessorTracker(Long instanceId, Function<Long, ProcessorTracker> creator) {
return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator);
}
/**
* 获取 ProcessorTracker
*/
public static ProcessorTracker getProcessorTracker(String instanceId) {
public static ProcessorTracker getProcessorTracker(Long instanceId) {
return instanceId2ProcessorTracker.get(instanceId);
}
public static void removeProcessorTracker(String instanceId) {
public static void removeProcessorTracker(Long instanceId) {
instanceId2ProcessorTracker.remove(instanceId);
}
}

View File

@ -92,7 +92,7 @@ public class TaskTracker {
* 更新任务状态
* 任务状态机只允许数字递增
*/
public void updateTaskStatus(String instanceId, String taskId, int newStatus, @Nullable String result) {
public void updateTaskStatus(Long instanceId, String taskId, int newStatus, @Nullable String result) {
boolean updateResult;
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
@ -217,7 +217,7 @@ public class TaskTracker {
CommonUtils.executeIgnoreException(() -> scheduledPool.shutdownNow());
// 1. 通知 ProcessorTracker 释放资源
String instanceId = instanceInfo.getInstanceId();
Long instanceId = instanceInfo.getInstanceId();
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
@ -257,7 +257,7 @@ public class TaskTracker {
}
Stopwatch stopwatch = Stopwatch.createStarted();
String instanceId = instanceInfo.getInstanceId();
Long instanceId = instanceInfo.getInstanceId();
// 1. 获取可以派发任务的 ProcessorTracker
List<String> availablePtIps = ptStatusHolder.getAvailableProcessorTrackers();
@ -323,7 +323,7 @@ public class TaskTracker {
private void innerRun() {
final String instanceId = instanceInfo.getInstanceId();
Long instanceId = instanceInfo.getInstanceId();
// 1. 查询统计信息
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId);
@ -396,7 +396,7 @@ public class TaskTracker {
boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
req.setResult(resultTask.getResult());
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getValue() : InstanceStatus.FAILED.getValue());
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(TIME_OUT_MS));
@ -422,7 +422,7 @@ public class TaskTracker {
}
// 4. 未完成上报状态
req.setInstanceStatus(InstanceStatus.RUNNING.getValue());
req.setInstanceStatus(InstanceStatus.RUNNING.getV());
serverActor.tell(req, null);
// 5.1 定期检查 -> 重试派发后未确认的任务

View File

@ -13,20 +13,20 @@ import java.util.function.Function;
*/
public class TaskTrackerPool {
private static final Map<String, TaskTracker> instanceId2TaskTracker = Maps.newConcurrentMap();
private static final Map<Long, TaskTracker> instanceId2TaskTracker = Maps.newConcurrentMap();
/**
* 获取 ProcessorTracker如果不存在则创建
*/
public static TaskTracker getTaskTrackerPool(String instanceId) {
public static TaskTracker getTaskTrackerPool(Long instanceId) {
return instanceId2TaskTracker.get(instanceId);
}
public static void remove(String instanceId) {
public static void remove(Long instanceId) {
instanceId2TaskTracker.remove(instanceId);
}
public static void atomicCreateTaskTracker(String instanceId, Function<String, TaskTracker> creator) {
public static void atomicCreateTaskTracker(Long instanceId, Function<Long, TaskTracker> creator) {
instanceId2TaskTracker.computeIfAbsent(instanceId, creator);
}

View File

@ -15,8 +15,8 @@ public class SimpleTaskQuery {
private static final String LINK = " and ";
private String taskId;
private String jobId;
private String instanceId;
private Long jobId;
private Long instanceId;
private String taskName;
private String address;
private Integer status;
@ -37,10 +37,10 @@ public class SimpleTaskQuery {
sb.append("task_id = '").append(taskId).append("'").append(LINK);
}
if (!StringUtils.isEmpty(jobId)) {
sb.append("job_id = '").append(jobId).append("'").append(LINK);
sb.append("job_id = ").append(jobId).append(LINK);
}
if (!StringUtils.isEmpty(instanceId)) {
sb.append("instance_id = '").append(instanceId).append("'").append(LINK);
sb.append("instance_id = ").append(instanceId).append(LINK);
}
if (!StringUtils.isEmpty(address)) {
sb.append("address = '").append(address).append("'").append(LINK);

View File

@ -35,6 +35,6 @@ public interface TaskDAO {
/**
* 查询 taskId -> taskResult (为了性能特殊定制主要是内存占用如果使用 simpleQueryPlus内存中需要同时存在3份数据 是同时存在3份数据吗)
*/
Map<String, String> queryTaskId2TaskResult(String instanceId) throws SQLException;
Map<String, String> queryTaskId2TaskResult(Long instanceId) throws SQLException;
}

View File

@ -22,7 +22,7 @@ public class TaskDAOImpl implements TaskDAO {
public void initTable() throws Exception {
String delTableSQL = "drop table if exists task_info";
String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique KEY pkey (instance_id, task_id))";
String createTableSQL = "create table task_info (task_id varchar(20), instance_id bigint(20), job_id bigint(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique KEY pkey (instance_id, task_id))";
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) {
stat.execute(delTableSQL);
@ -130,12 +130,12 @@ public class TaskDAOImpl implements TaskDAO {
}
@Override
public Map<String, String> queryTaskId2TaskResult(String instanceId) throws SQLException {
public Map<String, String> queryTaskId2TaskResult(Long instanceId) throws SQLException {
ResultSet rs = null;
Map<String, String> taskId2Result = Maps.newLinkedHashMapWithExpectedSize(4096);
String sql = "select task_id, result from task_info where instance_id = ?";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, instanceId);
ps.setLong(1, instanceId);
rs = ps.executeQuery();
while (rs.next()) {
taskId2Result.put(rs.getString("task_id"), rs.getString("result"));
@ -154,8 +154,8 @@ public class TaskDAOImpl implements TaskDAO {
private static TaskDO convert(ResultSet rs) throws SQLException {
TaskDO task = new TaskDO();
task.setTaskId(rs.getString("task_id"));
task.setInstanceId(rs.getString("instance_id"));
task.setJobId(rs.getString("job_id"));
task.setInstanceId(rs.getLong("instance_id"));
task.setJobId(rs.getLong("job_id"));
task.setTaskName(rs.getString("task_name"));
task.setTaskContent(rs.getBytes("task_content"));
task.setAddress(rs.getString("address"));
@ -169,8 +169,8 @@ public class TaskDAOImpl implements TaskDAO {
private static void fillInsertPreparedStatement(TaskDO task, PreparedStatement ps) throws SQLException {
ps.setString(1, task.getTaskId());
ps.setString(2, task.getInstanceId());
ps.setString(3, task.getJobId());
ps.setLong(2, task.getInstanceId());
ps.setLong(3, task.getJobId());
ps.setString(4, task.getTaskName());
ps.setBytes(5, task.getTaskContent());
ps.setString(6, task.getAddress());

View File

@ -19,8 +19,8 @@ public class TaskDO {
// 层次命名法可以表示 Map 后的父子关系 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task
private String taskId;
private String jobId;
private String instanceId;
private Long jobId;
private Long instanceId;
// 任务名称
private String taskName;
// 任务对象序列化后的二进制数据

View File

@ -70,7 +70,7 @@ public class TaskPersistenceService {
/**
* 依靠主键更新 Task
*/
public boolean updateTask(String instanceId, String taskId, TaskDO updateEntity) {
public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) {
try {
updateEntity.setLastModifiedTime(System.currentTimeMillis());
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
@ -110,7 +110,7 @@ public class TaskPersistenceService {
/**
* 获取 MapReduce Broadcast 的最后一个任务
*/
public Optional<TaskDO> getLastTask(String instanceId) {
public Optional<TaskDO> getLastTask(Long instanceId) {
try {
SimpleTaskQuery query = new SimpleTaskQuery();
@ -130,7 +130,7 @@ public class TaskPersistenceService {
return Optional.empty();
}
public List<TaskDO> getAllTask(String instanceId) {
public List<TaskDO> getAllTask(Long instanceId) {
try {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
@ -146,7 +146,7 @@ public class TaskPersistenceService {
/**
* 获取指定状态的Task
*/
public List<TaskDO> getTaskByStatus(String instanceId, TaskStatus status, int limit) {
public List<TaskDO> getTaskByStatus(Long instanceId, TaskStatus status, int limit) {
try {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
@ -163,7 +163,7 @@ public class TaskPersistenceService {
* 获取 TaskTracker 管理的子 task 状态统计信息
* TaskStatus -> num
*/
public Map<TaskStatus, Long> getTaskStatusStatistics(String instanceId) {
public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId) {
try {
SimpleTaskQuery query = new SimpleTaskQuery();
@ -191,7 +191,7 @@ public class TaskPersistenceService {
/**
* 查询 taskId -> taskResultreduce阶段或postProcess 阶段使用
*/
public Map<String, String> getTaskId2ResultMap(String instanceId) {
public Map<String, String> getTaskId2ResultMap(Long instanceId) {
try {
return execute(() -> taskDAO.queryTaskId2TaskResult(instanceId));
}catch (Exception e) {
@ -203,7 +203,7 @@ public class TaskPersistenceService {
/**
* 查询任务状态只查询 status节约 I/O 资源 -> 测试表明效果惊人...磁盘I/O果然是重要瓶颈...
*/
public Optional<TaskStatus> getTaskStatus(String instanceId, String taskId) {
public Optional<TaskStatus> getTaskStatus(Long instanceId, String taskId) {
try {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
@ -221,7 +221,7 @@ public class TaskPersistenceService {
/**
* 查询任务失败数量只查询 failed_cnt节约 I/O 资源
*/
public Optional<Integer> getTaskFailedCnt(String instanceId, String taskId) {
public Optional<Integer> getTaskFailedCnt(Long instanceId, String taskId) {
try {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
@ -241,7 +241,7 @@ public class TaskPersistenceService {
/**
* 批量更新 Task 状态
*/
public boolean batchUpdateTaskStatus(String instanceId, List<String> taskIds, TaskStatus status, String result) {
public boolean batchUpdateTaskStatus(Long instanceId, List<String> taskIds, TaskStatus status, String result) {
try {
return execute(() -> {
@ -262,7 +262,7 @@ public class TaskPersistenceService {
}
public boolean deleteAllTasks(String instanceId) {
public boolean deleteAllTasks(Long instanceId) {
try {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
@ -286,7 +286,7 @@ public class TaskPersistenceService {
return Collections.emptyList();
}
private static SimpleTaskQuery genKeyQuery(String instanceId, String taskId) {
private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setTaskId(taskId);

View File

@ -16,8 +16,8 @@ public class InstanceInfo implements Serializable {
/**
* 基础信息
*/
private String jobId;
private String instanceId;
private Long jobId;
private Long instanceId;
/**
* 任务执行处理器信息

View File

@ -13,7 +13,7 @@ import java.io.Serializable;
@Data
public class BroadcastTaskPreExecuteFinishedReq implements Serializable {
private String instanceId;
private Long instanceId;
private String taskId;
private boolean success;

View File

@ -21,7 +21,7 @@ import java.util.List;
@NoArgsConstructor
public class ProcessorMapTaskRequest implements Serializable {
private String instanceId;
private Long instanceId;
private String taskName;
private List<SubTask> subTasks;

View File

@ -13,7 +13,7 @@ import java.io.Serializable;
@Data
public class ProcessorReportTaskStatusReq implements Serializable {
private String instanceId;
private Long instanceId;
private String taskId;
private int status;

View File

@ -14,7 +14,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class ProcessorTrackerStatusReportReq {
private String instanceId;
private Long instanceId;
/**
* 请求发起时间
@ -31,7 +31,7 @@ public class ProcessorTrackerStatusReportReq {
*/
private String ip;
public ProcessorTrackerStatusReportReq(String instanceId, long remainTaskNum) {
public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) {
this.instanceId = instanceId;
this.remainTaskNum = remainTaskNum;

View File

@ -14,7 +14,7 @@ import java.io.Serializable;
@Data
public class TaskTrackerStopInstanceReq implements Serializable {
private String instanceId;
private Long instanceId;
// 保留字段暂时没用
private String type;
}

View File

@ -29,8 +29,8 @@ public class PersistenceServiceTest {
TaskDO task = new TaskDO();
taskList.add(task);
task.setJobId("1");
task.setInstanceId("10086" + ThreadLocalRandom.current().nextInt(2));
task.setJobId(1L);
task.setInstanceId(10086L + ThreadLocalRandom.current().nextInt(2));
task.setTaskId(i + "");
task.setFailedCnt(0);
task.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
@ -63,7 +63,7 @@ public class PersistenceServiceTest {
public void testDeleteAllTasks() {
System.out.println("=============== testBatchDelete ===============");
boolean delete = taskPersistenceService.deleteAllTasks("100860");
boolean delete = taskPersistenceService.deleteAllTasks(100860L);
System.out.println("delete result:" + delete);
}

View File

@ -64,8 +64,8 @@ public class ProcessorTrackerTest {
InstanceInfo instanceInfo = new InstanceInfo();
instanceInfo.setJobId("1");
instanceInfo.setInstanceId("10086");
instanceInfo.setJobId(1L);
instanceInfo.setInstanceId(10086L);
instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());

View File

@ -10,6 +10,7 @@ import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.common.utils.NetUtils;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -54,9 +55,9 @@ public class TaskTrackerTest {
private static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType) {
ServerScheduleJobReq req = new ServerScheduleJobReq();
req.setJobId("1");
req.setInstanceId("10086");
req.setAllWorkerAddress(NetUtils.getLocalHost());
req.setJobId(1L);
req.setInstanceId(10086L);
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost()));
req.setJobParams("this is job Params");
req.setInstanceParams("this is instance Params");