optimize script processor

This commit is contained in:
tjq 2020-04-17 12:39:30 +08:00
parent a9eef1f16d
commit dc123e621f
22 changed files with 216 additions and 112 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -5,16 +5,16 @@
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-client</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<oms.common.version>1.0.0</oms.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -5,18 +5,18 @@
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<slf4j.version>1.7.30</slf4j.version>
<commons.lang.version>3.10</commons.lang.version>
<guava.version>28.2-jre</guava.version>
<guava.version>29.0-jre</guava.version>
<okhttp.version>4.4.1</okhttp.version>
<akka.version>2.6.4</akka.version>
</properties>

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.common;
import java.io.Serializable;
/**
* OMS 序列化接口
* OMS 序列化标记接口
*
* @author tjq
* @since 2020/4/16

View File

@ -5,18 +5,18 @@
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-server</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<oms.common.version>1.0.0</oms.common.version>
<hikaricp.version>3.4.2</hikaricp.version>
<mysql.version>8.0.19</mysql.version>
<curator.version>4.3.0</curator.version>

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
@ -37,15 +38,18 @@ public interface InstanceLogRepository extends JpaRepository<InstanceLogDO, Long
*/
@Transactional
@Modifying
@CanIgnoreReturnValue
@Query(value = "update instance_log set status = ?2, running_times = ?3, actual_trigger_time = ?4, task_tracker_address = ?5, result = ?6, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4Trigger(long instanceId, int status, long runningTimes, long actualTriggerTime, String taskTrackerAddress, String result);
@Modifying
@Transactional
@CanIgnoreReturnValue
@Query(value = "update instance_log set status = ?2, running_times = ?3, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4FrequentJob(long instanceId, int status, long runningTimes);
// 状态检查三兄弟对应 WAITING_DISPATCH WAITING_WORKER_RECEIVE RUNNING 三阶段
// 数据量一般不大就不单独写SQL优化 IO
List<InstanceLogDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<InstanceLogDO> findByAppIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<InstanceLogDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);

View File

@ -2,7 +2,6 @@ akka {
actor {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
# TODO : 临时使用 Java 序列化,开发完成后切换到 protocol-buffers
allow-java-serialization = off
serialization-bindings {

View File

@ -5,17 +5,17 @@
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-worker</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<oms.common.version>1.0.0</oms.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -23,6 +23,7 @@ import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@ -42,7 +43,7 @@ import java.util.concurrent.TimeUnit;
* @since 2020/3/16
*/
@Slf4j
public class OhMyWorker implements ApplicationContextAware, InitializingBean {
public class OhMyWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
@Getter
private static OhMyConfig config;
@ -148,4 +149,9 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
log.error("[OhMyWorker] no available server in {}.", config.getServerAddress());
throw new RuntimeException("no server available!");
}
@Override
public void destroy() throws Exception {
timingPool.shutdownNow();
}
}

View File

@ -25,4 +25,9 @@ public class OhMyConfig {
* 本地持久化方式默认使用磁盘
*/
private StoreStrategy storeStrategy = StoreStrategy.DISK;
/**
* 最大返回值长度超过会被截断
*/
private int maxResultLength = 8096;
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker.core.executor;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
@ -65,24 +66,30 @@ public class ProcessorRunnable implements Runnable {
if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST && processor instanceof BroadcastProcessor) {
if (executeType == ExecuteType.BROADCAST) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
spReq.setTaskId(taskId);
spReq.setInstanceId(instanceId);
spReq.setSubInstanceId(task.getSubInstanceId());
try {
ProcessResult processResult = broadcastProcessor.preProcess(taskContext);
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(processResult.getMsg());
}catch (Exception e) {
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
}
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
try {
ProcessResult processResult = broadcastProcessor.preProcess(taskContext);
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(suit(processResult.getMsg()));
}catch (Exception e) {
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
}
}else {
spReq.setSuccess(true);
spReq.setMsg("NO_PREPOST_TASK");
}
spReq.setReportTime(System.currentTimeMillis());
taskTrackerActor.tell(spReq, null);
@ -121,7 +128,7 @@ public class ProcessorRunnable implements Runnable {
}
TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, lastResult.getMsg());
reportStatus(status, suit(lastResult.getMsg()));
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
return;
@ -136,7 +143,7 @@ public class ProcessorRunnable implements Runnable {
log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e);
processResult = new ProcessResult(false, e.toString());
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, processResult.getMsg());
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()));
}
/**
@ -164,4 +171,16 @@ public class ProcessorRunnable implements Runnable {
ThreadLocalStore.clear();
}
}
// 裁剪返回结果到合适的大小
private String suit(String result) {
final int maxLength = OhMyWorker.getConfig().getMaxResultLength();
if (result.length() <= maxLength) {
return result;
}
log.warn("[ProcessorRunnable-{}] task(taskId={})'s result is too large({}>{}), a part will be discarded.",
task.getInstanceId(), task.getTaskId(), result.length(), maxLength);
return result.substring(0, maxLength).concat("...");
}
}

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.oms.worker.core.processor.built;
import java.util.concurrent.ExecutorService;
/**
* Python 处理器
*
@ -8,13 +10,13 @@ package com.github.kfcfans.oms.worker.core.processor.built;
*/
public class PythonProcessor extends ScriptProcessor {
public PythonProcessor(Long instanceId, String processorInfo) throws Exception {
super(instanceId, processorInfo);
public PythonProcessor(Long instanceId, String processorInfo, long timeout, ExecutorService pool) throws Exception {
super(instanceId, processorInfo, timeout, pool);
}
@Override
protected String genScriptPath(Long instanceId) {
return String.format("~/oms/script/python/%d.py", instanceId);
protected String genScriptName(Long instanceId) {
return String.format("python_%d.py", instanceId);
}
@Override

View File

@ -9,7 +9,11 @@ import org.apache.commons.io.FileUtils;
import java.io.*;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 脚本处理器
@ -20,16 +24,21 @@ import java.util.Set;
@Slf4j
public abstract class ScriptProcessor implements BasicProcessor {
private Long instanceId;
private final Long instanceId;
// 脚本绝对路径
private String scriptPath;
private final String scriptPath;
private final long timeout;
private final ExecutorService threadPool;
private static final String USER_HOME = System.getProperty("user.home", "oms");
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
public ScriptProcessor(Long instanceId, String processorInfo) throws Exception {
public ScriptProcessor(Long instanceId, String processorInfo, long timeout, ExecutorService pool) throws Exception {
this.instanceId = instanceId;
this.scriptPath = genScriptPath(instanceId);
this.scriptPath = USER_HOME + "/oms/script/" + genScriptName(instanceId);
this.timeout = timeout;
this.threadPool = pool;
File script = new File(scriptPath);
if (script.exists()) {
@ -69,41 +78,44 @@ public abstract class ScriptProcessor implements BasicProcessor {
// 2. 执行目标脚本
ProcessBuilder pb = new ProcessBuilder(fetchRunCommand(), scriptPath);
Process process = pb.start();
String s;
StringBuilder inputSB = new StringBuilder();
StringBuilder errorSB = new StringBuilder();
try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(process.getInputStream()));
BufferedReader stdError = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
while ((s = stdInput.readLine()) != null) {
inputSB.append(s);
}
while ((s = stdError.readLine()) != null) {
errorSB.append(s);
}
}
process.waitFor();
String result = null;
if (inputSB.length() > 0) {
result = "input:" + inputSB.toString() + " ; ";
}
if (errorSB.length() > 0) {
result = "error: " + errorSB.toString() + " ; ";
}
if (result == null) {
result = "PROCESS_SUCCESS";
}
threadPool.submit(() -> copyStream(process.getInputStream(), inputSB));
threadPool.submit(() -> copyStream(process.getErrorStream(), errorSB));
log.debug("[ShellProcessor] process result for instance(instanceId={}) is {}.", instanceId, result);
return new ProcessResult(true, result);
try {
boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS);
if (!s) {
return new ProcessResult(false, "TIMEOUT");
}
String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString());
log.debug("[ScriptProcessor] process result for instance(instanceId={}) is {}.", instanceId, result);
return new ProcessResult(true, result);
}catch (InterruptedException ie) {
return new ProcessResult(false, "Interrupted");
}
}
private void copyStream(InputStream is, StringBuilder sb) {
String line;
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
while ((line = br.readLine()) != null) {
sb.append(line);
}
} catch (Exception e) {
log.warn("[ScriptProcessor] copyStream failed.", e);
sb.append("Exception: ").append(e);
}
}
/**
* 生成绝对脚本路径
* 生成脚本名称
* @param instanceId 任务实例ID作为文件名称使用JobId会有更改不生效的问题
* @return 文件名称
*/
protected abstract String genScriptPath(Long instanceId);
protected abstract String genScriptName(Long instanceId);
/**
* 获取运行命令egshell返回 /bin/sh

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.core.processor.built;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
/**
* Shell 处理器
* ProcessorTracker 创建
@ -12,14 +14,13 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ShellProcessor extends ScriptProcessor {
public ShellProcessor(Long instanceId, String processorInfo) throws Exception {
super(instanceId, processorInfo);
public ShellProcessor(Long instanceId, String processorInfo, long timeout, ExecutorService pool) throws Exception {
super(instanceId, processorInfo, timeout, pool);
}
@Override
protected String genScriptPath(Long instanceId) {
return String.format("~/oms/script/shell/%d.sh", instanceId);
protected String genScriptName(Long instanceId) {
return String.format("shell_%d.sh", instanceId);
}
@Override

View File

@ -5,23 +5,30 @@ import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
/**
* 基础的处理器适用于单机执行
* TODO真实API不包含异常抛出为了便于开发先加上
*
* @author tjq
* @since 2020/3/18
*/
public interface BasicProcessor {
/**
* 核心处理逻辑
* @param context 任务上下文可通过 jobParams instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数
* @return 处理结果msg有长度限制超长会被裁剪
* @throws Exception 异常允许抛出异常但不推荐最好由业务开发者自己处理
*/
ProcessResult process(TaskContext context) throws Exception;
/**
* Processor 初始化方法
* 用于构造 Processor 对象相当于构造方法
* @throws Exception 异常抛出异常则视为处理器构造失败任务直接失败
*/
default void init() throws Exception {
}
/**
* Processor 销毁方法
* 销毁 Processor 时的回调方法暂时未被使用
* @throws Exception 异常
*/
default void destroy() throws Exception {
}

View File

@ -7,7 +7,6 @@ import java.util.Map;
/**
* 广播执行处理器适用于广播执行
* TODO真实API不包含异常抛出为了便于开发先加上
*
* @author tjq
* @since 2020/3/18
@ -17,9 +16,9 @@ public interface BroadcastProcessor extends BasicProcessor {
/**
* 在所有节点广播执行前执行只会在一台机器执行一次
*/
ProcessResult preProcess(TaskContext taskContext) throws Exception;
ProcessResult preProcess(TaskContext context) throws Exception;
/**
* 在所有节点广播执行完成后执行只会在一台机器执行一次
*/
ProcessResult postProcess(TaskContext taskContext, Map<String, String> taskId2Result) throws Exception;
ProcessResult postProcess(TaskContext context, Map<String, String> taskId2Result) throws Exception;
}

View File

@ -36,7 +36,7 @@ public abstract class MapReduceProcessor implements BasicProcessor {
/**
* 分发子任务
* @param taskList 子任务再次执行时可通过 TaskContext#getSubTask 获取
* @param taskName 子任务名称作用不大
* @param taskName 子任务名称即子任务处理器中 TaskContext#getTaskName 获取到的值
* @return map结果
*/
public ProcessResult map(List<?> taskList, String taskName) {
@ -82,5 +82,11 @@ public abstract class MapReduceProcessor implements BasicProcessor {
return TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName());
}
public abstract ProcessResult reduce(TaskContext taskContext, Map<String, String> taskId2Result);
/**
* reduce方法将在所有任务结束后调用
* @param context 任务上下文
* @param taskId2Result 保存了各个子Task的执行结果
* @return reduce产生的结果将作为任务最终的返回结果
*/
public abstract ProcessResult reduce(TaskContext context, Map<String, String> taskId2Result);
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.worker.core.tracker.processor;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
@ -65,11 +66,11 @@ public class ProcessorTracker {
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
// 初始化 线程池
initThreadPool();
// 初始化 Processor
initProcessor();
// 初始化
initThreadPool();
// 初始化定时任务
initTimingJob();
}
@ -146,7 +147,7 @@ public class ProcessorTracker {
*/
private void initThreadPool() {
int poolSize = instanceInfo.getThreadConcurrency();
int poolSize = calThreadPoolSize();
// 待执行队列为了防止对内存造成较大压力内存队列不能太大
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE);
// 自定义线程池中线程名称
@ -164,7 +165,9 @@ public class ProcessorTracker {
* 初始化定时任务
*/
private void initTimingJob() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build();
// 全称 oms-ProcessTracker-TimingPool
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-ProcessorTrackerTimingPool-%d").build();
timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS);
@ -193,6 +196,9 @@ public class ProcessorTracker {
}
/**
* 初始化处理器 Processor
*/
private void initProcessor() throws Exception {
ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());
@ -214,10 +220,10 @@ public class ProcessorTracker {
}
break;
case SHELL:
processor = new ShellProcessor(instanceId, processorInfo);
processor = new ShellProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS(), threadPool);
break;
case PYTHON:
processor = new PythonProcessor(instanceId, processorInfo);
processor = new PythonProcessor(instanceId, processorInfo, instanceInfo.getInstanceTimeoutMS(), threadPool);
break;
default:
log.warn("[ProcessorRunnable-{}] unknown processor type: {}.", instanceId, processorType);
@ -230,4 +236,23 @@ public class ProcessorTracker {
}
}
/**
* 计算线程池大小
*/
private int calThreadPoolSize() {
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());
if (executeType == ExecuteType.MAP_REDUCE) {
return instanceInfo.getThreadConcurrency();
}
// 脚本类需要三个线程执行线程输入流错误流分配 N + 1个线程给线程池
if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {
return 4;
}
return 2;
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.worker.core.tracker.task;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.model.InstanceDetail;
@ -101,7 +102,7 @@ public abstract class TaskTracker {
/* *************************** 对外方法区 *************************** */
/**
* 更新Task状态任务状态机限定只允许状态变量递增eg. 允许 FAILED -> SUCCEED但不允许 SUCCEED -> FAILED
* 更新Task状态
* @param taskId task的IDtask为任务实例的执行单位
* @param newStatus task的新状态
* @param reportTime 上报时间
@ -109,11 +110,11 @@ public abstract class TaskTracker {
*/
public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) {
boolean updateResult;
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
// 同一个task串行执行
// 需要保证 worker 的其他代码没有用 taskId 或者 String 作为锁...否则就等着找bug吧...主要是不舍得加前缀这用的可以常量池内存啊...
// taskId其实是可能重复的同一台机器上多个 TaskTracker...不过真实冲突概率较低就算冲突了也问题不大忽略
synchronized (taskId.intern()) {
Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId);
@ -133,7 +134,7 @@ public abstract class TaskTracker {
}
}
// 过滤过期的请求潜在的集群时间一致性需求重试跨Worker时时间不一致可能导致问题
// 过滤过期的请求潜在的集群时间一致性需求重试跨Worker时时间不一致可能导致问题
if (lastReportTime > reportTime) {
log.warn("[TaskTracker-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
lastReportTime, reportTime, instanceId, taskId, newStatus);
@ -147,7 +148,7 @@ public abstract class TaskTracker {
int configTaskRetryNum = instanceInfo.getTaskRetryNum();
if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum > 1) {
// 失败不是主要的情况多查一次数据库也问题不大况且前面有缓存顶着大部分情况不会去查DB
// 失败不是主要的情况多查一次数据库也问题不大况且前面有缓存顶着大部分情况之前不会去查DB
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
// 查询DB再失败的话就不重试了...
if (taskOpt.isPresent()) {
@ -157,12 +158,16 @@ public abstract class TaskTracker {
TaskDO updateEntity = new TaskDO();
updateEntity.setFailedCnt(failedCnt + 1);
// 非本机任务则更换 ProcessorTracker 地址进行执行
String oldAddress = taskOpt.get().getAddress();
if (!StringUtils.isEmpty(oldAddress)) {
if (!oldAddress.equals(OhMyWorker.getWorkerAddress())) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
}
/*
地址规则
1. 当前存储的地址为任务派发的目的地ProcessorTracker地址
2. 根任务最终任务必须由TaskTracker所在机器执行如果是根任务和最终任务不应当修改地址
3. 广播任务每台机器都需要执行因此不应该重新分配worker广播任务不应当修改地址
*/
String taskName = taskOpt.get().getTaskName();
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
}
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
@ -182,7 +187,7 @@ public abstract class TaskTracker {
updateEntity.setStatus(nTaskStatus.getValue());
updateEntity.setResult(result);
updateEntity.setLastReportTime(reportTime);
updateResult = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
boolean updateResult = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
if (!updateResult) {
log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId);
@ -276,8 +281,8 @@ public abstract class TaskTracker {
// 2. 删除所有数据库数据
boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId);
if (!dbSuccess) {
log.warn("[TaskTracker-{}] delete tasks from database failed.", instanceId);
taskPersistenceService.deleteAllTasks(instanceId);
log.error("[TaskTracker-{}] delete tasks from database failed, shutdown TaskTracker failed.", instanceId);
return;
}else {
log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId);
}
@ -303,20 +308,27 @@ public abstract class TaskTracker {
*/
protected void dispatchTask(TaskDO task, String processorTrackerAddress) {
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task);
String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath);
ptActor.tell(startTaskReq, null);
// 更新 ProcessorTrackerStatus 状态
ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
// 更新数据库如果更新数据库失败可能导致重复执行先不处理
// 1. 持久化更新数据库如果更新数据库失败可能导致重复执行先不处理
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
// 写入处理该任务的 ProcessorTracker
updateEntity.setAddress(processorTrackerAddress);
taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
if (!success) {
log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());
return;
}
// 2. 更新 ProcessorTrackerStatus 状态
ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
// 3. 初始化缓存
taskId2LastReportTime.put(task.getTaskId(), -1L);
// 4. 任务派发
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task);
String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath);
ptActor.tell(startTaskReq, null);
log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
}

View File

@ -4,6 +4,9 @@ import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.oms.worker.core.processor.built.ShellProcessor;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 测试脚本处理器
*
@ -12,24 +15,27 @@ import org.junit.jupiter.api.Test;
*/
public class ScriptProcessorTest {
private static final long timeout = 10000;
private static final ExecutorService pool = Executors.newFixedThreadPool(3);
@Test
public void testLocalShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(1L, "ls -a");
ShellProcessor sp = new ShellProcessor(1L, "ls -a", timeout, pool);
sp.process(null);
ShellProcessor sp2 = new ShellProcessor(2777L, "pwd");
ShellProcessor sp2 = new ShellProcessor(2777L, "pwd", timeout, pool);
sp2.process(null);
}
@Test
public void testLocalPythonProcessor() throws Exception {
PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'");
PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'", timeout, pool);
pp.process(null);
}
@Test
public void testNetShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh");
ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh", timeout, pool);
sp.process(null);
}

View File

@ -51,8 +51,8 @@ java.lang.RuntimeException: create root task failed.
解决方案印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧...
#### 未知的数组越界问题(可能是数据库性能问题)
秒级Broadcast任务在第四次执行时当Processor完成执行上报状态时TaskTracker报错错误的本质原因是无法从数据库中找到这个task对应的记录...
时间表达式FIX_DELAY对应的TaskTracker为FrequentTaskTracker
问题:秒级Broadcast任务在第四次执行时当Processor完成执行上报状态时TaskTracker报错错误的本质原因是无法从数据库中找到这个task对应的记录...
场景:时间表达式FIX_DELAY对应的TaskTracker为FrequentTaskTracker
异常堆栈
```text
@ -90,4 +90,5 @@ java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6).
```
```
解决方案初步怀疑在连续更改时由于数据库锁的存在导致行不可见不知道H2具体的特性。因此需要保证同一个taskId串行更新 -> synchronize Yes

View File

@ -6,7 +6,7 @@
<groupId>com.github.kfcfans</groupId>
<artifactId>oh-my-scheduler</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.0</version>
<modules>
<module>oh-my-scheduler-worker</module>
<module>oh-my-scheduler-server</module>