diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index 83b9810f..e037542c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -1,14 +1,30 @@ package com.github.kfcfans.oms.worker; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Props; +import com.github.kfcfans.oms.worker.actors.TaskTrackerActor; import com.github.kfcfans.oms.worker.common.OhMyConfig; +import com.github.kfcfans.oms.worker.common.constants.AkkaConstant; +import com.github.kfcfans.oms.worker.common.utils.NetUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; +import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker; +import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import org.springframework.util.StopWatch; +import org.springframework.util.StringUtils; + +import java.util.Map; /** * 客户端启动类 @@ -16,12 +32,11 @@ import org.springframework.context.ApplicationContextAware; * @author KFCFans * @since 2020/3/16 */ +@Slf4j public class OhMyWorker implements ApplicationContextAware, InitializingBean { - public static ActorSystem actorSystem; - @Getter - @Setter private static OhMyConfig config; + public static ActorSystem actorSystem; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { @@ -35,5 +50,39 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { public void init() { + Stopwatch stopwatch = Stopwatch.createStarted(); + log.info("[OhMyWorker] start to initialize OhMyWorker..."); + + try { + + // 初始化 ActorSystem + Map overrideConfig = Maps.newHashMap(); + String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP(); + overrideConfig.put("akka.remote.artery.canonical.hostname", localIP); + if (config.getListeningPort() != null) { + overrideConfig.put("akka.remote.artery.canonical.port", config.getListeningPort()); + } + log.info("[OhMyWorker] akka-remote listening address config: {}", overrideConfig); + Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG_NAME); + Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); + + actorSystem = ActorSystem.create(AkkaConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig); + actorSystem.actorOf(Props.create(TaskTrackerActor.class)); + actorSystem.actorOf(Props.create(ProcessorTracker.class)); + + // 初始化存储 + TaskPersistenceService.INSTANCE.init(); + + }catch (Exception e) { + log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e); + } + + } + + public static OhMyConfig getConfig() { + return config; + } + public void setConfig(OhMyConfig cfg) { + config = cfg; } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java index 238082cb..e6fa28d8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ProcessorTrackerActor.java @@ -27,8 +27,13 @@ public class ProcessorTrackerActor extends AbstractActor { * 处理来自TaskTracker的task执行请求 */ private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { + String jobId = req.getJobId(); String instanceId = req.getInstanceId(); - ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req.getThreadConcurrency())); - processorTracker.submitTask(req, getSender()); + ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> { + ProcessorTracker pt = new ProcessorTracker(req); + log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId); + return pt; + }); + processorTracker.submitTask(req); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java index feaa427e..74b87dbb 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java @@ -21,5 +21,6 @@ public class OhMyConfig { /** * 通讯端口 */ - private int listeningPort; + private String listeningIP; + private Integer listeningPort; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java index c2caa1fb..dacdf97c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java @@ -14,6 +14,8 @@ public class AkkaConstant { public static final String ACTOR_SYSTEM_NAME = "oms"; public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; - public static final String WORKER_ACTOR_NAME = "worker"; + public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker"; + + public static final String AKKA_CONFIG_NAME = "oms-akka-application.conf"; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index fbefd16a..1261d304 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.worker.core.executor; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ProcessorType; @@ -31,7 +32,7 @@ import org.springframework.beans.BeanUtils; @AllArgsConstructor public class ProcessorRunnable implements Runnable { - private final ActorRef taskTrackerActor; + private final ActorSelection taskTrackerActor; @Getter private final TaskTrackerStartTaskReq request; @@ -85,6 +86,7 @@ public class ProcessorRunnable implements Runnable { TaskContext taskContext = new TaskContext(); BeanUtils.copyProperties(request, taskContext); taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent())); + ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext); // 5. 正式提交运行 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 8f184465..977d4c32 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -1,14 +1,22 @@ package com.github.kfcfans.oms.worker.core.tracker.processor; -import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.constants.AkkaConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable; +import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.util.CollectionUtils; +import java.util.List; import java.util.concurrent.*; /** @@ -17,20 +25,59 @@ import java.util.concurrent.*; * @author tjq * @since 2020/3/20 */ +@Slf4j public class ProcessorTracker { - private ExecutorService threadPool; + // 记录创建时间 + private long startTime; + private long jobTimeLimitMS; - public ProcessorTracker(int threadConcurrency) { + // 记录该 Job 相关信息 + private String instanceId; + private String executeType; + private String processorType; + private String processorInfo; + private int threadConcurrency; + private String jobParams; + private String instanceParams; + private int maxRetryTimes; - // 初始化运行用的线程池 - BlockingQueue queue = new ArrayBlockingQueue<>(10); - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build(); - RejectedProcessorHandler rejectHandler = new RejectedProcessorHandler(TaskPersistenceService.INSTANCE); - threadPool = new ThreadPoolExecutor(threadConcurrency, threadConcurrency, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectHandler); + private String taskTrackerAddress; + private ActorSelection taskTrackerActorRef; + + private ThreadPoolExecutor threadPool; + private static final int MAX_QUEUE_SIZE = 20; + + /** + * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T) + */ + public ProcessorTracker(TaskTrackerStartTaskReq request) { + + // 赋值 + this.startTime = System.currentTimeMillis(); + this.jobTimeLimitMS = request.getJobTimeLimitMS(); + this.instanceId = request.getInstanceId(); + this.executeType = request.getExecuteType(); + this.processorType = request.getProcessorType(); + this.processorInfo = request.getProcessorInfo(); + this.threadConcurrency = request.getThreadConcurrency(); + this.jobParams = request.getJobParams(); + this.instanceParams = request.getInstanceParams(); + this.maxRetryTimes = request.getMaxRetryTimes(); + this.taskTrackerAddress = request.getTaskTrackerAddress(); + + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, AkkaConstant.Task_TRACKER_ACTOR_NAME); + this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); + + // 初始化 + initProcessorPool(); + initTimingJob(); } - public void submitTask(TaskTrackerStartTaskReq newTaskReq, ActorRef taskTrackerActorRef) { + /** + * 提交任务 + */ + public void submitTask(TaskTrackerStartTaskReq newTaskReq) { // 1. 回复接受成功 ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); @@ -38,8 +85,114 @@ public class ProcessorTracker { reportReq.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue()); taskTrackerActorRef.tell(reportReq, null); - // 2. 提交线程池执行 + // 2.1 内存控制,持久化 + if (threadPool.getQueue().size() > MAX_QUEUE_SIZE) { + + TaskDO newTask = new TaskDO(); + BeanUtils.copyProperties(newTaskReq, newTask); + newTask.setTaskContent(newTaskReq.getSubTaskContent()); + newTask.setAddress(newTaskReq.getTaskTrackerAddress()); + newTask.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue()); + newTask.setFailedCnt(newTaskReq.getCurrentRetryTimes()); + newTask.setCreatedTime(System.currentTimeMillis()); + newTask.setLastModifiedTime(System.currentTimeMillis()); + + boolean save = TaskPersistenceService.INSTANCE.save(newTask); + if (save) { + log.debug("[RejectedProcessorHandler] persistent task({}) succeed.", newTask); + }else { + log.warn("[RejectedProcessorHandler] persistent task({}) failed.", newTask); + } + return; + } + + // 2.2 提交执行 ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, newTaskReq); threadPool.submit(processorRunnable); } + + /** + * 任务是否超时 + */ + public boolean isTimeout() { + return System.currentTimeMillis() - startTime > jobTimeLimitMS; + } + + + /** + * 初始化线程池 + */ + private void initProcessorPool() { + // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大 + BlockingQueue queue = new LinkedBlockingQueue<>(); + // 自定义线程池中线程名称 + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build(); + threadPool = new ThreadPoolExecutor(threadConcurrency, threadConcurrency, 60L, TimeUnit.SECONDS, queue, threadFactory); + // 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0) + threadPool.allowCoreThreadTimeOut(true); + } + + /** + * 初始化定时任务 + */ + private void initTimingJob() { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build(); + ScheduledExecutorService timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); + + timingPool.scheduleWithFixedDelay(new PoolStatusCheckRunnable(), 60, 10, TimeUnit.SECONDS); + } + + + /** + * 定期检查线程池运行状态(内存中的任务数量不足,则即使从数据库中获取并提交执行) + */ + private class PoolStatusCheckRunnable implements Runnable { + + @Override + public void run() { + + int queueSize = threadPool.getQueue().size(); + if (queueSize >= MAX_QUEUE_SIZE / 2) { + return; + } + + TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE; + List taskDOList = taskPersistenceService.getNeedRunTask(instanceId, MAX_QUEUE_SIZE / 2); + + if (CollectionUtils.isEmpty(taskDOList)) { + return; + } + + List deletedIds = Lists.newLinkedList(); + + log.debug("[ProcessorTracker] timing add task to thread pool."); + + // 提交到线程池执行 + taskDOList.forEach(task -> { + runTask(task); + deletedIds.add(task.getTaskId()); + }); + + // 删除任务 + taskPersistenceService.batchDelete(instanceId, deletedIds); + } + + private void runTask(TaskDO task) { + TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(); + BeanUtils.copyProperties(task, req); + + req.setExecuteType(executeType); + req.setProcessorType(processorType); + req.setProcessorInfo(processorInfo); + req.setTaskTrackerAddress(taskTrackerAddress); + req.setJobParams(jobParams); + req.setInstanceParams(instanceParams); + req.setSubTaskContent(task.getTaskContent()); + req.setMaxRetryTimes(maxRetryTimes); + req.setCurrentRetryTimes(task.getFailedCnt()); + + ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, req); + threadPool.submit(processorRunnable); + } + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java index 857eef51..c91914e3 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTrackerPool.java @@ -16,6 +16,9 @@ public class ProcessorTrackerPool { private static final Map instanceId2ProcessorTracker = Maps.newConcurrentMap(); + /** + * 获取 ProcessorTracker,如果不存在则创建 + */ public static ProcessorTracker getProcessorTracker(String instanceId, Function creator) { return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/RejectedProcessorHandler.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/RejectedProcessorHandler.java deleted file mode 100644 index 911c9ad9..00000000 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/RejectedProcessorHandler.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.github.kfcfans.oms.worker.core.tracker.processor; - -import com.github.kfcfans.oms.worker.common.constants.TaskStatus; -import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable; -import com.github.kfcfans.oms.worker.persistence.TaskDO; -import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; -import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; - -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; - -/** - * 线程池拒绝策略 -> 持久化到本地H2数据库 - * 出于内存占用考虑,线程池的阻塞队列容量不大,大量子任务涌入时需要持久化到本地数据库 - * 第一版先直接写H2,如果发现有性能问题再转变为 内存队列 + 批量写入 模式 - * - * @author tjq - * @since 2020/3/23 - */ -@Slf4j -@AllArgsConstructor -public class RejectedProcessorHandler implements RejectedExecutionHandler { - - private final TaskPersistenceService taskPersistenceService; - - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - - ProcessorRunnable processorRunnable = (ProcessorRunnable) r; - TaskTrackerStartTaskReq startTaskReq = processorRunnable.getRequest(); - - TaskDO newTask = new TaskDO(); - BeanUtils.copyProperties(startTaskReq, newTask); - newTask.setTaskContent(startTaskReq.getSubTaskContent()); - newTask.setAddress(startTaskReq.getTaskTrackerAddress()); - newTask.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue()); - newTask.setFailedCnt(0); - newTask.setCreatedTime(System.currentTimeMillis()); - newTask.setLastModifiedTime(System.currentTimeMillis()); - - boolean save = taskPersistenceService.save(newTask); - if (save) { - log.debug("[RejectedProcessorHandler] persistent task({}) succeed.", newTask); - }else { - log.warn("[RejectedProcessorHandler] persistent task({}) failed.", newTask); - } - } -} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index e34badcd..f0919bb8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -2,7 +2,6 @@ package com.github.kfcfans.oms.worker.core.tracker.task; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.JobInstanceStatus; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.oms.worker.OhMyWorker; @@ -17,7 +16,6 @@ import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; @@ -36,6 +34,10 @@ import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public abstract class TaskTracker { + protected long startTime; + protected long jobTimeLimitMS; + + // 任务实例信息 protected JobInstanceInfo jobInstanceInfo; protected ActorRef taskTrackerActorRef; @@ -49,6 +51,9 @@ public abstract class TaskTracker { public TaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) { + this.startTime = System.currentTimeMillis(); + this.jobTimeLimitMS = jobInstanceInfo.getTimeLimit(); + this.jobInstanceInfo = jobInstanceInfo; this.taskTrackerActorRef = taskTrackerActorRef; this.taskPersistenceService = TaskPersistenceService.INSTANCE; @@ -92,6 +97,13 @@ public abstract class TaskTracker { return finished.get(); } + /** + * 任务是否超时 + */ + public boolean isTimeout() { + return System.currentTimeMillis() - startTime > jobTimeLimitMS; + } + /** * 持久化根任务,只有完成持久化才能视为任务开始running(先持久化,再报告server) */ @@ -135,7 +147,7 @@ public abstract class TaskTracker { if (StringUtils.isEmpty(targetIP)) { targetIP = allWorkerAddress.get(ThreadLocalRandom.current().nextInt(allWorkerAddress.size())); } - String targetPath = AkkaUtils.getAkkaRemotePath(targetIP, AkkaConstant.WORKER_ACTOR_NAME); + String targetPath = AkkaUtils.getAkkaRemotePath(targetIP, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME); ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath); // 发送请求(Akka的tell是至少投递一次,经实验表明无法投递消息也不会报错...印度啊...) diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java index 7d178dc6..49daa242 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java @@ -15,7 +15,7 @@ public interface TaskDAO { /** * 初始化任务表 */ - boolean initTable(); + void initTable() throws Exception; /** * 插入任务数据 @@ -28,6 +28,8 @@ public interface TaskDAO { */ boolean update(TaskDO task); + int batchDelete(String instanceId, List taskIds); + TaskDO selectByKey(String instanceId, String taskId); List simpleQuery(SimpleTaskQuery query); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index e33c42d5..19ee154b 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -4,6 +4,7 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; import java.sql.*; import java.util.Collection; @@ -20,7 +21,7 @@ import java.util.Map; public class TaskDAOImpl implements TaskDAO { @Override - public boolean initTable() { + public void initTable() throws Exception { String delTableSQL = "drop table if exists task_info"; String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique KEY pkey (instance_id, task_id))"; @@ -28,11 +29,7 @@ public class TaskDAOImpl implements TaskDAO { try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { stat.execute(delTableSQL); stat.execute(createTableSQL); - }catch (Exception e) { - log.error("[TaskDAO] initTable failed.", e); - return false; } - return true; } @Override @@ -73,6 +70,20 @@ public class TaskDAOImpl implements TaskDAO { return false; } + @Override + public int batchDelete(String instanceId, List taskIds) { + String deleteSQL = "delete from task_info where instance_id = %s and task_id in %s"; + String sql = String.format(deleteSQL, instanceId, getInStringCondition(taskIds)); + try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { + + return stat.executeUpdate(sql); + + }catch (Exception e) { + log.error("[TaskDAO] batchDelete failed(instanceId = {}, taskIds = {}).", instanceId, taskIds, e); + } + return 0; + } + @Override public TaskDO selectByKey(String instanceId, String taskId) { String selectSQL = "select * from task_info where instance_id = ? and task_id = ?"; @@ -195,7 +206,19 @@ public class TaskDAOImpl implements TaskDAO { ps.setLong(11, task.getLastModifiedTime()); } + private static String getInStringCondition(Collection collection) { + if (CollectionUtils.isEmpty(collection)) { + return "()"; + } + StringBuilder sb = new StringBuilder(" ( "); + collection.forEach(str -> sb.append("'").append(str).append("',")); + return sb.replace(sb.length() -1, sb.length(), " ) ").toString(); + } + public static void main(String[] args) throws Exception { + + System.out.println(getInStringCondition(Lists.newArrayList("2.1"))); + TaskDAOImpl taskDAO = new TaskDAOImpl(); taskDAO.initTable(); @@ -234,6 +257,9 @@ public class TaskDAOImpl implements TaskDAO { List> dbRES = taskDAO.simpleQueryPlus(query); System.out.println(dbRES); + System.out.println("=========== start to delete ==========="); + System.out.println(taskDAO.batchDelete("22", Lists.newArrayList("2.1")));; + Thread.sleep(100000); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index ac1c7719..10bbc750 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -17,6 +17,7 @@ import java.util.Map; */ public class TaskPersistenceService { + private static volatile boolean initialized = false; public static TaskPersistenceService INSTANCE = new TaskPersistenceService(); private TaskPersistenceService() { @@ -25,6 +26,13 @@ public class TaskPersistenceService { private TaskDAO taskDAO = new TaskDAOImpl(); private static final int MAX_BATCH_SIZE = 50; + public void init() throws Exception { + if (initialized) { + return; + } + taskDAO.initTable(); + } + public boolean save(TaskDO task) { boolean success = taskDAO.save(task); if (!success) { @@ -97,4 +105,21 @@ public class TaskPersistenceService { }); return result; } + + /** + * 获取需要被执行的任务 + */ + public List getNeedRunTask(String instanceId, int limit) { + + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue()); + query.setLimit(limit); + + return taskDAO.simpleQuery(query); + } + + public int batchDelete(String instanceId, List taskIds) { + return taskDAO.batchDelete(instanceId, taskIds); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java index f42fb59b..a363c92a 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -28,6 +28,8 @@ public class TaskTrackerStartTaskReq { private int threadConcurrency; // TaskTracker 地址 private String taskTrackerAddress; + // 任务超时时间 + private long jobTimeLimitMS; private String jobParams; private String instanceParams; @@ -48,6 +50,7 @@ public class TaskTrackerStartTaskReq { threadConcurrency = instanceInfo.getThreadConcurrency(); executeType = instanceInfo.getExecuteType(); taskTrackerAddress = NetUtils.getLocalHost(); + jobTimeLimitMS = instanceInfo.getTimeLimit(); jobParams = instanceInfo.getJobParams(); instanceParams = instanceInfo.getInstanceParams();