After reading Alive, my mood is very complicated

This commit is contained in:
tjq 2020-04-18 12:21:19 +08:00
parent f0eaa32ba2
commit 3b547f58ee
11 changed files with 118 additions and 20 deletions

View File

@ -23,12 +23,13 @@ OhMyScheduler是一个分布式调度平台和分布式计算框架
### 已完成
* 定时调度功能支持CRON表达式、固定时间间隔、固定频率和API四种方式。
* 任务执行功能支持单机、广播和MapReduce三种执行方式。
* 执行处理器支持SpringBean、普通Java对象、Shell脚本、Python脚本的执行
* 高可用与水平扩展:调度服务器可以部署任意数量的节点,不存在调度的性能瓶颈。
* 不怎么美观但可以用的前端界面
### 待开发
* 工作流任务编排当前版本勉强可以用MapReduce代替不过工作流挺酷的等框架稳定后进行开发。
* 更多的执行器当前只支持内置Java执行器至少需要支持常用的shell、python和外置Java顺便提供jar包上传下载功能处理器这个问题不大肝就行
* [应用级别资源管理和任务优先级](https://yq.aliyun.com/articles/753141?spm=a2c4e.11153959.teamhomeleft.1.696d60c9vt9lLx)没有机器资源时进入排队队列。不过我觉得SchedulerX的方案不太行SchedulerX无抢占一旦低优先级任务开始运行那么只能等他执行完成才能开始高优先级任务这明显不合理。可是考虑抢占的话又要多考虑很多东西...先放在TODO列表吧
# 参考
>Alibaba SchedulerX 2.0

View File

@ -89,7 +89,7 @@ public class InstanceManager {
// FREQUENT 任务没有失败重试机制TaskTracker一直运行即可只需要将存活信息同步到DB即可
// FREQUENT 任务的 newStatus 只有2中情况一种是 RUNNING一种是 FAILED表示该机器 overload需要重新选一台机器执行
// 综上直接把 status runningNum 同步到DB即可
if (timeExpressionType != TimeExpressionType.CRON.getV()) {
if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) {
getInstanceLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum());
return;
}
@ -107,8 +107,8 @@ public class InstanceManager {
log.info("[InstanceManager] instance(instanceId={}) execute succeed.", instanceId);
}else if (newStatus == InstanceStatus.FAILED) {
// 当前重试次数 < 最大重试次数进行重试
if (updateEntity.getRunningTimes() < instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) {
// 当前重试次数 <= 最大重试次数进行重试 第一次运行runningTimes为1重试一次instanceRetryNum也为1故需要 =
if (updateEntity.getRunningTimes() <= instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) {
log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes());
getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes());

View File

@ -145,7 +145,7 @@ public class JobScheduleService {
long targetTriggerTime = jobInfoDO.getNextTriggerTime();
long delay = 0;
if (targetTriggerTime < nowTime) {
log.warn("[JobScheduleService] Job({}) was delayed.", jobInfoDO);
log.warn("[JobScheduleService] find a delayed Job: {}.", jobInfoDO);
}else {
delay = targetTriggerTime - nowTime;
}

View File

@ -79,21 +79,20 @@ public class JobController {
}
@GetMapping("/disable")
public ResultDTO<Void> disableJob(Long jobId) throws Exception {
jobService.disableJob(jobId);
public ResultDTO<Void> disableJob(String jobId) throws Exception {
jobService.disableJob(Long.valueOf(jobId));
return ResultDTO.success(null);
}
@GetMapping("/delete")
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
jobService.deleteJob(jobId);
public ResultDTO<Void> deleteJob(String jobId) throws Exception {
jobService.deleteJob(Long.valueOf(jobId));
return ResultDTO.success(null);
}
@GetMapping("/run")
public ResultDTO<Void> runImmediately(Long jobId) {
jobService.runJob(jobId, null);
return ResultDTO.success(null);
public ResultDTO<Long> runImmediately(String jobId) {
return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null));
}
@PostMapping("/list")

View File

@ -0,0 +1,18 @@
package com.github.kfcfans.oms.server;
import org.springframework.stereotype.Service;
/**
* 神秘服务
*
* @author tjq
* @since 2020/4/18
*/
@Service
public class MysteryService {
public String hasaki() {
return "面对疾风吧~";
}
}

View File

@ -27,7 +27,8 @@ public class OhMySchedulerConfig {
OhMyConfig config = new OhMyConfig();
config.setAppName("oms-test");
config.setServerAddress(serverAddress);
config.setStoreStrategy(StoreStrategy.DISK);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算
config.setStoreStrategy(StoreStrategy.MEMORY);
// 2. 创建 Worker 对象设置配置文件
OhMyWorker ohMyWorker = new OhMyWorker();

View File

@ -0,0 +1,71 @@
package com.github.kfcfans.oms.server.processors;
import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.server.MysteryService;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.MapProcessor;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* Map处理器 示例
*
* @author tjq
* @since 2020/4/18
*/
@Component
public class MapProcessorDemo extends MapProcessor {
@Resource
private MysteryService mysteryService;
// 每一批发送任务大小
private static final int batchSize = 100;
// 发送的批次
private static final int batchNum = 2;
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("============== MapProcessorDemo#process ==============");
System.out.println("isRootTask:" + isRootTask());
System.out.println("taskContext:" + JsonUtils.toJSONString(context));
System.out.println(mysteryService.hasaki());
if (isRootTask()) {
System.out.println("==== MAP ====");
List<SubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < batchNum; j++) {
SubTask subTask = new SubTask();
subTask.siteId = j;
subTask.itemIds = Lists.newLinkedList();
subTasks.add(subTask);
for (int i = 0; i < batchSize; i++) {
subTask.itemIds.add(i);
}
}
return map(subTasks, "MAP_TEST_TASK");
}else {
System.out.println("==== PROCESS ====");
System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask()));
boolean b = ThreadLocalRandom.current().nextBoolean();
return new ProcessResult(b, "RESULT:" + b);
}
}
@Getter
@NoArgsConstructor
@AllArgsConstructor
private static class SubTask {
private Integer siteId;
private List<Integer> itemIds;
}
}

View File

@ -7,8 +7,6 @@ import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadLocalRandom;
/**
* 单机处理器 示例
* com.github.kfcfans.oms.server.processors.StandaloneProcessorDemo
@ -24,7 +22,8 @@ public class StandaloneProcessorDemo implements BasicProcessor {
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("================ StandaloneProcessorDemo#process ================");
boolean success = ThreadLocalRandom.current().nextBoolean();
// 根据控制台参数判断是否成功
boolean success = "success".equals(context.getJobParams());
System.out.println("TaskContext: " + JSONObject.toJSONString(context));
System.out.println("ProcessSuccess: " + success);
return new ProcessResult(success, context + ": " + success);

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.oms.worker.core.tracker.processor;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.common.RemoteConstant;
@ -183,10 +184,13 @@ public class ProcessorTracker {
public void run() {
long interval = System.currentTimeMillis() - startTime;
if (interval > instanceInfo.getInstanceTimeoutMS()) {
log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId);
destroy();
return;
// 秒级任务的ProcessorTracker不应该关闭
if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
if (interval > instanceInfo.getInstanceTimeoutMS()) {
log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId);
destroy();
return;
}
}
long waitingNum = threadPool.getQueue().size();

View File

@ -73,6 +73,9 @@ public abstract class TaskTracker {
if (instanceInfo.getInstanceTimeoutMS() <= 0) {
instanceInfo.setInstanceTimeoutMS(Long.MAX_VALUE);
}
// 赋予时间表达式类型
instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV());
this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress());
this.taskPersistenceService = TaskPersistenceService.INSTANCE;
this.finished = new AtomicBoolean(false);

View File

@ -28,6 +28,8 @@ public class InstanceInfo implements Serializable {
private String processorType;
// 处理器信息
private String processorInfo;
// 定时类型
private int timeExpressionType;
/**
* 超时时间