From 3b547f58eeea1b1c4d0a525699f5647d8e63efba Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 18 Apr 2020 12:21:19 +0800 Subject: [PATCH] After reading Alive, my mood is very complicated --- README.md | 3 +- .../service/instance/InstanceManager.java | 6 +- .../timing/schedule/JobScheduleService.java | 2 +- .../server/web/controller/JobController.java | 13 ++-- .../kfcfans/oms/server/MysteryService.java | 18 +++++ .../oms/server/OhMySchedulerConfig.java | 3 +- .../server/processors/MapProcessorDemo.java | 71 +++++++++++++++++++ .../processors/StandaloneProcessorDemo.java | 5 +- .../tracker/processor/ProcessorTracker.java | 12 ++-- .../worker/core/tracker/task/TaskTracker.java | 3 + .../oms/worker/pojo/model/InstanceInfo.java | 2 + 11 files changed, 118 insertions(+), 20 deletions(-) create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/MysteryService.java create mode 100644 oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapProcessorDemo.java diff --git a/README.md b/README.md index ac3b9a52..9b07263c 100644 --- a/README.md +++ b/README.md @@ -23,12 +23,13 @@ OhMyScheduler是一个分布式调度平台和分布式计算框架 ### 已完成 * 定时调度功能:支持CRON表达式、固定时间间隔、固定频率和API四种方式。 * 任务执行功能:支持单机、广播和MapReduce三种执行方式。 +* 执行处理器:支持SpringBean、普通Java对象、Shell脚本、Python脚本的执行 * 高可用与水平扩展:调度服务器可以部署任意数量的节点,不存在调度的性能瓶颈。 * 不怎么美观但可以用的前端界面 ### 待开发 * 工作流(任务编排):当前版本勉强可以用MapReduce代替,不过工作流挺酷的,等框架稳定后进行开发。 -* 更多的执行器:当前只支持内置Java执行器,至少需要支持常用的shell、python和外置Java(顺便提供jar包上传下载功能)处理器(这个问题不大,肝就行)。 +* [应用级别资源管理和任务优先级](https://yq.aliyun.com/articles/753141?spm=a2c4e.11153959.teamhomeleft.1.696d60c9vt9lLx):没有机器资源时,进入排队队列。不过我觉得SchedulerX的方案不太行,SchedulerX无抢占,一旦低优先级任务开始运行,那么只能等他执行完成才能开始高优先级任务,这明显不合理。可是考虑抢占的话又要多考虑很多东西...先放在TODO列表吧。 # 参考 >Alibaba SchedulerX 2.0 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index 4e404c7a..3db0f58d 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -89,7 +89,7 @@ public class InstanceManager { // FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可 // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行) // 综上,直接把 status 和 runningNum 同步到DB即可 - if (timeExpressionType != TimeExpressionType.CRON.getV()) { + if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) { getInstanceLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum()); return; } @@ -107,8 +107,8 @@ public class InstanceManager { log.info("[InstanceManager] instance(instanceId={}) execute succeed.", instanceId); }else if (newStatus == InstanceStatus.FAILED) { - // 当前重试次数 < 最大重试次数,进行重试 - if (updateEntity.getRunningTimes() < instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) { + // 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =) + if (updateEntity.getRunningTimes() <= instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) { log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes()); getDispatchService().dispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java index 9cfdf07b..fe324783 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/schedule/JobScheduleService.java @@ -145,7 +145,7 @@ public class JobScheduleService { long targetTriggerTime = jobInfoDO.getNextTriggerTime(); long delay = 0; if (targetTriggerTime < nowTime) { - log.warn("[JobScheduleService] Job({}) was delayed.", jobInfoDO); + log.warn("[JobScheduleService] find a delayed Job: {}.", jobInfoDO); }else { delay = targetTriggerTime - nowTime; } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java index 7f1bd8e0..64341343 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/web/controller/JobController.java @@ -79,21 +79,20 @@ public class JobController { } @GetMapping("/disable") - public ResultDTO disableJob(Long jobId) throws Exception { - jobService.disableJob(jobId); + public ResultDTO disableJob(String jobId) throws Exception { + jobService.disableJob(Long.valueOf(jobId)); return ResultDTO.success(null); } @GetMapping("/delete") - public ResultDTO deleteJob(Long jobId) throws Exception { - jobService.deleteJob(jobId); + public ResultDTO deleteJob(String jobId) throws Exception { + jobService.deleteJob(Long.valueOf(jobId)); return ResultDTO.success(null); } @GetMapping("/run") - public ResultDTO runImmediately(Long jobId) { - jobService.runJob(jobId, null); - return ResultDTO.success(null); + public ResultDTO runImmediately(String jobId) { + return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null)); } @PostMapping("/list") diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/MysteryService.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/MysteryService.java new file mode 100644 index 00000000..cfde81ff --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/MysteryService.java @@ -0,0 +1,18 @@ +package com.github.kfcfans.oms.server; + +import org.springframework.stereotype.Service; + +/** + * 神秘服务 + * + * @author tjq + * @since 2020/4/18 + */ +@Service +public class MysteryService { + + public String hasaki() { + return "面对疾风吧~"; + } + +} diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/OhMySchedulerConfig.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/OhMySchedulerConfig.java index 0914aa60..05a028b7 100644 --- a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/OhMySchedulerConfig.java +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/OhMySchedulerConfig.java @@ -27,7 +27,8 @@ public class OhMySchedulerConfig { OhMyConfig config = new OhMyConfig(); config.setAppName("oms-test"); config.setServerAddress(serverAddress); - config.setStoreStrategy(StoreStrategy.DISK); + // 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算 + config.setStoreStrategy(StoreStrategy.MEMORY); // 2. 创建 Worker 对象,设置配置文件 OhMyWorker ohMyWorker = new OhMyWorker(); diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapProcessorDemo.java new file mode 100644 index 00000000..b31a5631 --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapProcessorDemo.java @@ -0,0 +1,71 @@ +package com.github.kfcfans.oms.server.processors; + +import com.github.kfcfans.common.utils.JsonUtils; +import com.github.kfcfans.oms.server.MysteryService; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.MapProcessor; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Map处理器 示例 + * + * @author tjq + * @since 2020/4/18 + */ +@Component +public class MapProcessorDemo extends MapProcessor { + + @Resource + private MysteryService mysteryService; + + // 每一批发送任务大小 + private static final int batchSize = 100; + // 发送的批次 + private static final int batchNum = 2; + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + System.out.println("============== MapProcessorDemo#process =============="); + System.out.println("isRootTask:" + isRootTask()); + System.out.println("taskContext:" + JsonUtils.toJSONString(context)); + System.out.println(mysteryService.hasaki()); + + if (isRootTask()) { + System.out.println("==== MAP ===="); + List subTasks = Lists.newLinkedList(); + for (int j = 0; j < batchNum; j++) { + SubTask subTask = new SubTask(); + subTask.siteId = j; + subTask.itemIds = Lists.newLinkedList(); + subTasks.add(subTask); + for (int i = 0; i < batchSize; i++) { + subTask.itemIds.add(i); + } + } + return map(subTasks, "MAP_TEST_TASK"); + }else { + System.out.println("==== PROCESS ===="); + System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask())); + boolean b = ThreadLocalRandom.current().nextBoolean(); + return new ProcessResult(b, "RESULT:" + b); + } + } + + @Getter + @NoArgsConstructor + @AllArgsConstructor + private static class SubTask { + private Integer siteId; + private List itemIds; + } +} diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java index a15fac47..56d9685e 100644 --- a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java @@ -7,8 +7,6 @@ import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.util.concurrent.ThreadLocalRandom; - /** * 单机处理器 示例 * com.github.kfcfans.oms.server.processors.StandaloneProcessorDemo @@ -24,7 +22,8 @@ public class StandaloneProcessorDemo implements BasicProcessor { public ProcessResult process(TaskContext context) throws Exception { System.out.println("================ StandaloneProcessorDemo#process ================"); - boolean success = ThreadLocalRandom.current().nextBoolean(); + // 根据控制台参数判断是否成功 + boolean success = "success".equals(context.getJobParams()); System.out.println("TaskContext: " + JSONObject.toJSONString(context)); System.out.println("ProcessSuccess: " + success); return new ProcessResult(success, context + ": " + success); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index d2029004..ff2a8da8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -3,6 +3,7 @@ package com.github.kfcfans.oms.worker.core.tracker.processor; import akka.actor.ActorSelection; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ProcessorType; +import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.common.RemoteConstant; @@ -183,10 +184,13 @@ public class ProcessorTracker { public void run() { long interval = System.currentTimeMillis() - startTime; - if (interval > instanceInfo.getInstanceTimeoutMS()) { - log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId); - destroy(); - return; + // 秒级任务的ProcessorTracker不应该关闭 + if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) { + if (interval > instanceInfo.getInstanceTimeoutMS()) { + log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId); + destroy(); + return; + } } long waitingNum = threadPool.getQueue().size(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 29806fe0..96976c45 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -73,6 +73,9 @@ public abstract class TaskTracker { if (instanceInfo.getInstanceTimeoutMS() <= 0) { instanceInfo.setInstanceTimeoutMS(Long.MAX_VALUE); } + // 赋予时间表达式类型 + instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV()); + this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); this.taskPersistenceService = TaskPersistenceService.INSTANCE; this.finished = new AtomicBoolean(false); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java index cefc171f..9461b544 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/InstanceInfo.java @@ -28,6 +28,8 @@ public class InstanceInfo implements Serializable { private String processorType; // 处理器信息 private String processorInfo; + // 定时类型 + private int timeExpressionType; /** * 超时时间