diff --git a/.DS_Store b/.DS_Store index 40185804..d136aede 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/oh-my-scheduler-client/pom.xml b/oh-my-scheduler-client/pom.xml index 36051002..6ab10fb3 100644 --- a/oh-my-scheduler-client/pom.xml +++ b/oh-my-scheduler-client/pom.xml @@ -5,16 +5,16 @@ oh-my-scheduler com.github.kfcfans - 1.0.0-SNAPSHOT + 1.0.0 4.0.0 oh-my-scheduler-client - 1.0.0-SNAPSHOT + 1.0.0 jar - 1.0.0-SNAPSHOT + 1.0.0 5.6.1 diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml index 7a165d10..02ac3024 100644 --- a/oh-my-scheduler-common/pom.xml +++ b/oh-my-scheduler-common/pom.xml @@ -5,18 +5,18 @@ oh-my-scheduler com.github.kfcfans - 1.0.0-SNAPSHOT + 1.0.0 4.0.0 oh-my-scheduler-common - 1.0.0-SNAPSHOT + 1.0.0 jar 1.7.30 3.10 - 28.2-jre + 29.0-jre 4.4.1 2.6.4 diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/OmsSerializable.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/OmsSerializable.java index 26a8b427..483ea820 100644 --- a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/OmsSerializable.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/OmsSerializable.java @@ -3,7 +3,7 @@ package com.github.kfcfans.common; import java.io.Serializable; /** - * OMS 序列化接口 + * OMS 序列化标记接口 * * @author tjq * @since 2020/4/16 diff --git a/oh-my-scheduler-server/pom.xml b/oh-my-scheduler-server/pom.xml index 31d3354e..4fc3f51f 100644 --- a/oh-my-scheduler-server/pom.xml +++ b/oh-my-scheduler-server/pom.xml @@ -5,18 +5,18 @@ oh-my-scheduler com.github.kfcfans - 1.0.0-SNAPSHOT + 1.0.0 4.0.0 oh-my-scheduler-server - 1.0.0-SNAPSHOT + 1.0.0 jar 2.9.2 2.2.6.RELEASE - 1.0.0-SNAPSHOT + 1.0.0 3.4.2 8.0.19 4.3.0 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java index 48ad3e65..144d2b37 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/repository/InstanceLogRepository.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.server.persistence.repository; import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; @@ -37,15 +38,18 @@ public interface InstanceLogRepository extends JpaRepository findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List jobIds, int status, long time); List findByAppIdInAndStatusAndActualTriggerTimeLessThan(List jobIds, int status, long time); List findByAppIdInAndStatusAndGmtModifiedBefore(List jobIds, int status, Date time); diff --git a/oh-my-scheduler-server/src/main/resources/oms-server.akka.conf b/oh-my-scheduler-server/src/main/resources/oms-server.akka.conf index d0d2da66..ba5b53c5 100644 --- a/oh-my-scheduler-server/src/main/resources/oms-server.akka.conf +++ b/oh-my-scheduler-server/src/main/resources/oms-server.akka.conf @@ -2,7 +2,6 @@ akka { actor { # cluster is better(recommend by official document), but I prefer remote provider = remote - # TODO : 临时使用 Java 序列化,开发完成后切换到 protocol-buffers allow-java-serialization = off serialization-bindings { diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index 0b612350..3d08c64b 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -5,17 +5,17 @@ oh-my-scheduler com.github.kfcfans - 1.0.0-SNAPSHOT + 1.0.0 4.0.0 oh-my-scheduler-worker - 1.0.0-SNAPSHOT + 1.0.0 jar 5.2.4.RELEASE - 1.0.0-SNAPSHOT + 1.0.0 1.4.200 3.4.2 5.6.1 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index ccafb82f..974d3b27 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -23,6 +23,7 @@ import com.typesafe.config.ConfigFactory; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -42,7 +43,7 @@ import java.util.concurrent.TimeUnit; * @since 2020/3/16 */ @Slf4j -public class OhMyWorker implements ApplicationContextAware, InitializingBean { +public class OhMyWorker implements ApplicationContextAware, InitializingBean, DisposableBean { @Getter private static OhMyConfig config; @@ -148,4 +149,9 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { log.error("[OhMyWorker] no available server in {}.", config.getServerAddress()); throw new RuntimeException("no server available!"); } + + @Override + public void destroy() throws Exception { + timingPool.shutdownNow(); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java index 23392816..6d2fe306 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java @@ -25,4 +25,9 @@ public class OhMyConfig { * 本地持久化方式,默认使用磁盘 */ private StoreStrategy storeStrategy = StoreStrategy.DISK; + /** + * 最大返回值长度,超过会被截断 + */ + private int maxResultLength = 8096; + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 8e022ca1..c2bb97db 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker.core.executor; import akka.actor.ActorSelection; import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.ThreadLocalStore; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; @@ -65,24 +66,30 @@ public class ProcessorRunnable implements Runnable { if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) { // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task - if (executeType == ExecuteType.BROADCAST && processor instanceof BroadcastProcessor) { + if (executeType == ExecuteType.BROADCAST) { - BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq(); spReq.setTaskId(taskId); spReq.setInstanceId(instanceId); spReq.setSubInstanceId(task.getSubInstanceId()); - try { - ProcessResult processResult = broadcastProcessor.preProcess(taskContext); - spReq.setSuccess(processResult.isSuccess()); - spReq.setMsg(processResult.getMsg()); - }catch (Exception e) { - log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); - spReq.setSuccess(false); - spReq.setMsg(e.toString()); - } + if (processor instanceof BroadcastProcessor) { + BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; + try { + ProcessResult processResult = broadcastProcessor.preProcess(taskContext); + spReq.setSuccess(processResult.isSuccess()); + spReq.setMsg(suit(processResult.getMsg())); + }catch (Exception e) { + log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); + spReq.setSuccess(false); + spReq.setMsg(e.toString()); + } + + }else { + spReq.setSuccess(true); + spReq.setMsg("NO_PREPOST_TASK"); + } spReq.setReportTime(System.currentTimeMillis()); taskTrackerActor.tell(spReq, null); @@ -121,7 +128,7 @@ public class ProcessorRunnable implements Runnable { } TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; - reportStatus(status, lastResult.getMsg()); + reportStatus(status, suit(lastResult.getMsg())); log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); return; @@ -136,7 +143,7 @@ public class ProcessorRunnable implements Runnable { log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e); processResult = new ProcessResult(false, e.toString()); } - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, processResult.getMsg()); + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg())); } /** @@ -164,4 +171,16 @@ public class ProcessorRunnable implements Runnable { ThreadLocalStore.clear(); } } + + // 裁剪返回结果到合适的大小 + private String suit(String result) { + + final int maxLength = OhMyWorker.getConfig().getMaxResultLength(); + if (result.length() <= maxLength) { + return result; + } + log.warn("[ProcessorRunnable-{}] task(taskId={})'s result is too large({}>{}), a part will be discarded.", + task.getInstanceId(), task.getTaskId(), result.length(), maxLength); + return result.substring(0, maxLength).concat("..."); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/PythonProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/PythonProcessor.java index a2a79eae..ba49e6db 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/PythonProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/PythonProcessor.java @@ -1,5 +1,7 @@ package com.github.kfcfans.oms.worker.core.processor.built; +import java.util.concurrent.ExecutorService; + /** * Python 处理器 * @@ -8,13 +10,13 @@ package com.github.kfcfans.oms.worker.core.processor.built; */ public class PythonProcessor extends ScriptProcessor { - public PythonProcessor(Long instanceId, String processorInfo) throws Exception { - super(instanceId, processorInfo); + public PythonProcessor(Long instanceId, String processorInfo, long timeout, ExecutorService pool) throws Exception { + super(instanceId, processorInfo, timeout, pool); } @Override - protected String genScriptPath(Long instanceId) { - return String.format("~/oms/script/python/%d.py", instanceId); + protected String genScriptName(Long instanceId) { + return String.format("python_%d.py", instanceId); } @Override diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java index 3a90292b..f054090d 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ScriptProcessor.java @@ -9,7 +9,11 @@ import org.apache.commons.io.FileUtils; import java.io.*; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * 脚本处理器 @@ -20,16 +24,21 @@ import java.util.Set; @Slf4j public abstract class ScriptProcessor implements BasicProcessor { - private Long instanceId; + private final Long instanceId; // 脚本绝对路径 - private String scriptPath; + private final String scriptPath; + private final long timeout; + private final ExecutorService threadPool; + private static final String USER_HOME = System.getProperty("user.home", "oms"); private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); - public ScriptProcessor(Long instanceId, String processorInfo) throws Exception { + public ScriptProcessor(Long instanceId, String processorInfo, long timeout, ExecutorService pool) throws Exception { this.instanceId = instanceId; - this.scriptPath = genScriptPath(instanceId); + this.scriptPath = USER_HOME + "/oms/script/" + genScriptName(instanceId); + this.timeout = timeout; + this.threadPool = pool; File script = new File(scriptPath); if (script.exists()) { @@ -69,41 +78,44 @@ public abstract class ScriptProcessor implements BasicProcessor { // 2. 执行目标脚本 ProcessBuilder pb = new ProcessBuilder(fetchRunCommand(), scriptPath); Process process = pb.start(); - String s; + StringBuilder inputSB = new StringBuilder(); StringBuilder errorSB = new StringBuilder(); - try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(process.getInputStream())); - BufferedReader stdError = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { - while ((s = stdInput.readLine()) != null) { - inputSB.append(s); - } - while ((s = stdError.readLine()) != null) { - errorSB.append(s); - } - } - process.waitFor(); - String result = null; - if (inputSB.length() > 0) { - result = "input:" + inputSB.toString() + " ; "; - } - if (errorSB.length() > 0) { - result = "error: " + errorSB.toString() + " ; "; - } - if (result == null) { - result = "PROCESS_SUCCESS"; - } + threadPool.submit(() -> copyStream(process.getInputStream(), inputSB)); + threadPool.submit(() -> copyStream(process.getErrorStream(), errorSB)); - log.debug("[ShellProcessor] process result for instance(instanceId={}) is {}.", instanceId, result); - return new ProcessResult(true, result); + try { + boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS); + if (!s) { + return new ProcessResult(false, "TIMEOUT"); + } + String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString()); + log.debug("[ScriptProcessor] process result for instance(instanceId={}) is {}.", instanceId, result); + return new ProcessResult(true, result); + }catch (InterruptedException ie) { + return new ProcessResult(false, "Interrupted"); + } + } + + private void copyStream(InputStream is, StringBuilder sb) { + String line; + try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + while ((line = br.readLine()) != null) { + sb.append(line); + } + } catch (Exception e) { + log.warn("[ScriptProcessor] copyStream failed.", e); + sb.append("Exception: ").append(e); + } } /** - * 生成绝对脚本路径 + * 生成脚本名称 * @param instanceId 任务实例ID,作为文件名称(使用JobId会有更改不生效的问题) * @return 文件名称 */ - protected abstract String genScriptPath(Long instanceId); + protected abstract String genScriptName(Long instanceId); /** * 获取运行命令(eg,shell返回 /bin/sh) diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ShellProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ShellProcessor.java index f1b017fb..4e479c7b 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ShellProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/built/ShellProcessor.java @@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.core.processor.built; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.ExecutorService; + /** * Shell 处理器 * 由 ProcessorTracker 创建 @@ -12,14 +14,13 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ShellProcessor extends ScriptProcessor { - - public ShellProcessor(Long instanceId, String processorInfo) throws Exception { - super(instanceId, processorInfo); + public ShellProcessor(Long instanceId, String processorInfo, long timeout, ExecutorService pool) throws Exception { + super(instanceId, processorInfo, timeout, pool); } @Override - protected String genScriptPath(Long instanceId) { - return String.format("~/oms/script/shell/%d.sh", instanceId); + protected String genScriptName(Long instanceId) { + return String.format("shell_%d.sh", instanceId); } @Override diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BasicProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BasicProcessor.java index 141e602e..12dc8a1b 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BasicProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BasicProcessor.java @@ -5,23 +5,30 @@ import com.github.kfcfans.oms.worker.core.processor.ProcessResult; /** * 基础的处理器,适用于单机执行 - * TODO:真实API不包含异常抛出,为了便于开发先加上 * * @author tjq * @since 2020/3/18 */ public interface BasicProcessor { + /** + * 核心处理逻辑 + * @param context 任务上下文,可通过 jobParams 和 instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数 + * @return 处理结果,msg有长度限制,超长会被裁剪 + * @throws Exception 异常,允许抛出异常,但不推荐,最好由业务开发者自己处理 + */ ProcessResult process(TaskContext context) throws Exception; /** - * Processor 初始化方法 + * 用于构造 Processor 对象,相当于构造方法 + * @throws Exception 异常,抛出异常则视为处理器构造失败,任务直接失败 */ default void init() throws Exception { } /** - * Processor 销毁方法 + * 销毁 Processor 时的回调方法,暂时未被使用 + * @throws Exception 异常 */ default void destroy() throws Exception { } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BroadcastProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BroadcastProcessor.java index 61351be4..5fd56e36 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BroadcastProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/BroadcastProcessor.java @@ -7,7 +7,6 @@ import java.util.Map; /** * 广播执行处理器,适用于广播执行 - * TODO:真实API不包含异常抛出,为了便于开发先加上 * * @author tjq * @since 2020/3/18 @@ -17,9 +16,9 @@ public interface BroadcastProcessor extends BasicProcessor { /** * 在所有节点广播执行前执行,只会在一台机器执行一次 */ - ProcessResult preProcess(TaskContext taskContext) throws Exception; + ProcessResult preProcess(TaskContext context) throws Exception; /** * 在所有节点广播执行完成后执行,只会在一台机器执行一次 */ - ProcessResult postProcess(TaskContext taskContext, Map taskId2Result) throws Exception; + ProcessResult postProcess(TaskContext context, Map taskId2Result) throws Exception; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapReduceProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapReduceProcessor.java index f0aa3baf..37080111 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/sdk/MapReduceProcessor.java @@ -36,7 +36,7 @@ public abstract class MapReduceProcessor implements BasicProcessor { /** * 分发子任务 * @param taskList 子任务,再次执行时可通过 TaskContext#getSubTask 获取 - * @param taskName 子任务名称,作用不大 + * @param taskName 子任务名称,即子任务处理器中 TaskContext#getTaskName 获取到的值 * @return map结果 */ public ProcessResult map(List taskList, String taskName) { @@ -82,5 +82,11 @@ public abstract class MapReduceProcessor implements BasicProcessor { return TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()); } - public abstract ProcessResult reduce(TaskContext taskContext, Map taskId2Result); + /** + * reduce方法将在所有任务结束后调用 + * @param context 任务上下文 + * @param taskId2Result 保存了各个子Task的执行结果 + * @return reduce产生的结果将作为任务最终的返回结果 + */ + public abstract ProcessResult reduce(TaskContext context, Map taskId2Result); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index 648124fe..98914fcc 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.worker.core.tracker.processor; import akka.actor.ActorSelection; +import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ProcessorType; import com.github.kfcfans.common.utils.CommonUtils; import com.github.kfcfans.oms.worker.OhMyWorker; @@ -65,11 +66,11 @@ public class ProcessorTracker { String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); + // 初始化 线程池 + initThreadPool(); // 初始化 Processor initProcessor(); - - // 初始化 - initThreadPool(); + // 初始化定时任务 initTimingJob(); } @@ -146,7 +147,7 @@ public class ProcessorTracker { */ private void initThreadPool() { - int poolSize = instanceInfo.getThreadConcurrency(); + int poolSize = calThreadPoolSize(); // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大 BlockingQueue queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE); // 自定义线程池中线程名称 @@ -164,7 +165,9 @@ public class ProcessorTracker { * 初始化定时任务 */ private void initTimingJob() { - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build(); + + // 全称 oms-ProcessTracker-TimingPool + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-ProcessorTrackerTimingPool-%d").build(); timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS); @@ -193,6 +196,9 @@ public class ProcessorTracker { } + /** + * 初始化处理器 Processor + */ private void initProcessor() throws Exception { ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); @@ -214,10 +220,10 @@ public class ProcessorTracker { } break; case SHELL: - processor = new ShellProcessor(instanceId, processorInfo); + processor = new ShellProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS(), threadPool); break; case PYTHON: - processor = new PythonProcessor(instanceId, processorInfo); + processor = new PythonProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS(), threadPool); break; default: log.warn("[ProcessorRunnable-{}] unknown processor type: {}.", instanceId, processorType); @@ -230,4 +236,23 @@ public class ProcessorTracker { } } + /** + * 计算线程池大小 + */ + private int calThreadPoolSize() { + ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); + ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); + + if (executeType == ExecuteType.MAP_REDUCE) { + return instanceInfo.getThreadConcurrency(); + } + + // 脚本类需要三个线程(执行线程、输入流、错误流),分配 N + 1个线程给线程池 + if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) { + return 4; + } + + return 2; + } + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index c589fd96..add7dc66 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.worker.core.tracker.task; import akka.actor.ActorSelection; +import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.RemoteConstant; import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.common.model.InstanceDetail; @@ -101,7 +102,7 @@ public abstract class TaskTracker { /* *************************** 对外方法区 *************************** */ /** - * 更新Task状态(任务状态机限定只允许状态变量递增,eg. 允许 FAILED -> SUCCEED,但不允许 SUCCEED -> FAILED) + * 更新Task状态 * @param taskId task的ID(task为任务实例的执行单位) * @param newStatus task的新状态 * @param reportTime 上报时间 @@ -109,11 +110,11 @@ public abstract class TaskTracker { */ public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) { - boolean updateResult; TaskStatus nTaskStatus = TaskStatus.of(newStatus); // 同一个task,串行执行 // 需要保证 worker 的其他代码没有用 taskId 或者 String 作为锁...否则就等着找bug吧...(主要是不舍得加前缀,这用的可以常量池内存啊...) + // taskId其实是可能重复的(同一台机器上多个 TaskTracker...不过真实冲突概率较低,就算冲突了也问题不大,忽略) synchronized (taskId.intern()) { Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId); @@ -133,7 +134,7 @@ public abstract class TaskTracker { } } - // 过滤过期的请求(潜在的集群时间一致性需求,重试跨Worker时时间不一致可能导致问题) + // 过滤过期的请求(潜在的集群时间一致性需求,重试跨Worker时,时间不一致可能导致问题) if (lastReportTime > reportTime) { log.warn("[TaskTracker-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", lastReportTime, reportTime, instanceId, taskId, newStatus); @@ -147,7 +148,7 @@ public abstract class TaskTracker { int configTaskRetryNum = instanceInfo.getTaskRetryNum(); if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum > 1) { - // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况不会去查DB) + // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB) Optional taskOpt = taskPersistenceService.getTask(instanceId, taskId); // 查询DB再失败的话,就不重试了... if (taskOpt.isPresent()) { @@ -157,12 +158,16 @@ public abstract class TaskTracker { TaskDO updateEntity = new TaskDO(); updateEntity.setFailedCnt(failedCnt + 1); - // 非本机任务,则更换 ProcessorTracker 地址进行执行 - String oldAddress = taskOpt.get().getAddress(); - if (!StringUtils.isEmpty(oldAddress)) { - if (!oldAddress.equals(OhMyWorker.getWorkerAddress())) { - updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); - } + /* + 地址规则: + 1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址) + 2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址) + 3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址) + */ + String taskName = taskOpt.get().getTaskName(); + ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); + if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) { + updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); } updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); @@ -182,7 +187,7 @@ public abstract class TaskTracker { updateEntity.setStatus(nTaskStatus.getValue()); updateEntity.setResult(result); updateEntity.setLastReportTime(reportTime); - updateResult = taskPersistenceService.updateTask(instanceId, taskId, updateEntity); + boolean updateResult = taskPersistenceService.updateTask(instanceId, taskId, updateEntity); if (!updateResult) { log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId); @@ -276,8 +281,8 @@ public abstract class TaskTracker { // 2. 删除所有数据库数据 boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId); if (!dbSuccess) { - log.warn("[TaskTracker-{}] delete tasks from database failed.", instanceId); - taskPersistenceService.deleteAllTasks(instanceId); + log.error("[TaskTracker-{}] delete tasks from database failed, shutdown TaskTracker failed.", instanceId); + return; }else { log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId); } @@ -303,20 +308,27 @@ public abstract class TaskTracker { */ protected void dispatchTask(TaskDO task, String processorTrackerAddress) { - TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task); - - String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath); - ptActor.tell(startTaskReq, null); - - // 更新 ProcessorTrackerStatus 状态 - ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true); - // 更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) + // 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) TaskDO updateEntity = new TaskDO(); updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue()); // 写入处理该任务的 ProcessorTracker updateEntity.setAddress(processorTrackerAddress); - taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity); + boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity); + if (!success) { + log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName()); + return; + } + + // 2. 更新 ProcessorTrackerStatus 状态 + ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true); + // 3. 初始化缓存 + taskId2LastReportTime.put(task.getTaskId(), -1L); + + // 4. 任务派发 + TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task); + String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath); + ptActor.tell(startTaskReq, null); log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName()); } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java index d61dc2af..59080608 100644 --- a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/ScriptProcessorTest.java @@ -4,6 +4,9 @@ import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor; import com.github.kfcfans.oms.worker.core.processor.built.ShellProcessor; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * 测试脚本处理器 * @@ -12,24 +15,27 @@ import org.junit.jupiter.api.Test; */ public class ScriptProcessorTest { + private static final long timeout = 10000; + private static final ExecutorService pool = Executors.newFixedThreadPool(3); + @Test public void testLocalShellProcessor() throws Exception { - ShellProcessor sp = new ShellProcessor(1L, "ls -a"); + ShellProcessor sp = new ShellProcessor(1L, "ls -a", timeout, pool); sp.process(null); - ShellProcessor sp2 = new ShellProcessor(2777L, "pwd"); + ShellProcessor sp2 = new ShellProcessor(2777L, "pwd", timeout, pool); sp2.process(null); } @Test public void testLocalPythonProcessor() throws Exception { - PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'"); + PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'", timeout, pool); pp.process(null); } @Test public void testNetShellProcessor() throws Exception { - ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh"); + ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh", timeout, pool); sp.process(null); } diff --git a/others/logs/TestRecord.md b/others/logs/TestRecord.md index 9d5e3050..d8abfd0e 100644 --- a/others/logs/TestRecord.md +++ b/others/logs/TestRecord.md @@ -51,8 +51,8 @@ java.lang.RuntimeException: create root task failed. 解决方案:印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧... #### 未知的数组越界问题(可能是数据库性能问题) -秒级Broadcast任务在第四次执行时,当Processor完成执行上报状态时,TaskTracker报错,错误的本质原因是无法从数据库中找到这个task对应的记录... -时间表达式:FIX_DELAY,对应的TaskTracker为FrequentTaskTracker +问题:秒级Broadcast任务在第四次执行时,当Processor完成执行上报状态时,TaskTracker报错,错误的本质原因是无法从数据库中找到这个task对应的记录... +场景:时间表达式:FIX_DELAY,对应的TaskTracker为FrequentTaskTracker 异常堆栈 ```text @@ -90,4 +90,5 @@ java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6). -``` \ No newline at end of file +``` +解决方案:初步怀疑在连续更改时,由于数据库锁的存在导致行不可见(不知道H2具体的特性)。因此,需要保证同一个taskId串行更新 -> synchronize Yes! \ No newline at end of file diff --git a/pom.xml b/pom.xml index 80a5ffac..acda9ac5 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.kfcfans oh-my-scheduler - 1.0.0-SNAPSHOT + 1.0.0 oh-my-scheduler-worker oh-my-scheduler-server