diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java index 02c4c29a..f4dbd2fa 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java @@ -6,15 +6,36 @@ import java.util.concurrent.atomic.AtomicLong; /** * 存储一些不方便直接传递的东西 - * #attention:警惕内存泄漏问题,最好在 ProcessorTracker destroy 时,执行 remove + * #attention:警惕内存泄漏问题,执行完毕后手动释放 * * @author tjq * @since 2020/3/18 */ public class ThreadLocalStore { - public static final ThreadLocal TASK_THREAD_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal TASK_THREAD_LOCAL = new ThreadLocal<>(); - public static final ThreadLocal TASK_ID_THREAD_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal TASK_ID_THREAD_LOCAL = new ThreadLocal<>(); + + + public static TaskDO getTask() { + return TASK_THREAD_LOCAL.get(); + } + + public static void setTask(TaskDO task) { + TASK_THREAD_LOCAL.set(task); + } + + public static AtomicLong getTaskIDAddr() { + if (TASK_ID_THREAD_LOCAL.get() == null) { + TASK_ID_THREAD_LOCAL.set(new AtomicLong(0)); + } + return TASK_ID_THREAD_LOCAL.get(); + } + + public static void clear() { + TASK_ID_THREAD_LOCAL.remove(); + TASK_THREAD_LOCAL.remove(); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 77daf022..7d2d5995 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -21,12 +21,10 @@ import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor; import com.google.common.base.Stopwatch; import lombok.AllArgsConstructor; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; /** * Processor 执行器 @@ -39,121 +37,113 @@ import java.util.concurrent.atomic.AtomicLong; public class ProcessorRunnable implements Runnable { - private InstanceInfo instanceInfo; + private final InstanceInfo instanceInfo; private final ActorSelection taskTrackerActor; - @Getter private final TaskDO task; - @Override - public void run() { + public void innerRun() { String taskId = task.getTaskId(); String instanceId = task.getInstanceId(); log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); - try { - // 0. 完成执行上下文准备 & 上报执行信息 - TaskContext taskContext = new TaskContext(); - BeanUtils.copyProperties(task, taskContext); - taskContext.setMaxRetryTimes(instanceInfo.getTaskRetryNum()); - taskContext.setCurrentRetryTimes(task.getFailedCnt()); - taskContext.setJobParams(instanceInfo.getJobParams()); - taskContext.setInstanceParams(instanceInfo.getInstanceParams()); - if (task.getTaskContent() != null && task.getTaskContent().length > 0) { - taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent())); - } - ThreadLocalStore.TASK_THREAD_LOCAL.set(task); - ThreadLocalStore.TASK_ID_THREAD_LOCAL.set(new AtomicLong(0)); + // 0. 完成执行上下文准备 & 上报执行信息 + TaskContext taskContext = new TaskContext(); + BeanUtils.copyProperties(task, taskContext); + taskContext.setMaxRetryTimes(instanceInfo.getTaskRetryNum()); + taskContext.setCurrentRetryTimes(task.getFailedCnt()); + taskContext.setJobParams(instanceInfo.getJobParams()); + taskContext.setInstanceParams(instanceInfo.getInstanceParams()); + if (task.getTaskContent() != null && task.getTaskContent().length > 0) { + taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent())); + } + ThreadLocalStore.setTask(task); - reportStatus(TaskStatus.WORKER_PROCESSING, null); + reportStatus(TaskStatus.WORKER_PROCESSING, null); - // 1. 获取 Processor - BasicProcessor processor = getProcessor(); - if (processor == null) { - reportStatus(TaskStatus.WORKER_PROCESS_FAILED, "NO_PROCESSOR"); - return; - } + // 1. 获取 Processor + BasicProcessor processor = getProcessor(); + if (processor == null) { + reportStatus(TaskStatus.WORKER_PROCESS_FAILED, "NO_PROCESSOR"); + return; + } - // 2. 根任务特殊处理 - ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); - if (TaskConstant.ROOT_TASK_ID.equals(taskId)) { + // 2. 根任务特殊处理 + ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); + if (TaskConstant.ROOT_TASK_ID.equals(taskId)) { - // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task - if (executeType == ExecuteType.BROADCAST) { + // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task + if (executeType == ExecuteType.BROADCAST) { - BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; - BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq(); - spReq.setTaskId(taskId); - spReq.setInstanceId(instanceId); - - try { - ProcessResult processResult = broadcastProcessor.preProcess(taskContext); - spReq.setSuccess(processResult.isSuccess()); - spReq.setMsg(processResult.getMsg()); - }catch (Exception e) { - log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); - spReq.setSuccess(false); - spReq.setMsg(e.toString()); - } - - taskTrackerActor.tell(spReq, null); - - // 广播执行的第一个 task 只执行 preProcess 部分 - return; - } - } - - // 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器) - if (TaskConstant.LAST_TASK_ID.equals(taskId)) { - - Stopwatch stopwatch = Stopwatch.createStarted(); - log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); - - ProcessResult lastResult; - Map taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId); - // 去除本任务 - taskId2ResultMap.remove(TaskConstant.LAST_TASK_ID); + BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; + BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq(); + spReq.setTaskId(taskId); + spReq.setInstanceId(instanceId); try { - switch (executeType) { - case BROADCAST: - BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; - lastResult = broadcastProcessor.postProcess(taskContext, taskId2ResultMap); - break; - case MAP_REDUCE: - MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor; - lastResult = mapReduceProcessor.reduce(taskContext, taskId2ResultMap); - break; - default: - lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG"); - } + ProcessResult processResult = broadcastProcessor.preProcess(taskContext); + spReq.setSuccess(processResult.isSuccess()); + spReq.setMsg(processResult.getMsg()); }catch (Exception e) { - lastResult = new ProcessResult(false, e.toString()); - log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e); + log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); + spReq.setSuccess(false); + spReq.setMsg(e.toString()); } - TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; - reportStatus(status, lastResult.getMsg()); + taskTrackerActor.tell(spReq, null); - log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); + // 广播执行的第一个 task 只执行 preProcess 部分 return; } - - - // 4. 正式提交运行 - ProcessResult processResult; - try { - processResult = processor.process(taskContext); - }catch (Exception e) { - log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e); - processResult = new ProcessResult(false, e.toString()); - } - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, processResult.getMsg()); - - }catch (Exception e) { - log.error("[ProcessorRunnable-{}] execute failed, please fix this bug!", instanceId, e); } + + // 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器) + if (TaskConstant.LAST_TASK_ID.equals(taskId)) { + + Stopwatch stopwatch = Stopwatch.createStarted(); + log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); + + ProcessResult lastResult; + Map taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId); + // 去除本任务 + taskId2ResultMap.remove(TaskConstant.LAST_TASK_ID); + + try { + switch (executeType) { + case BROADCAST: + BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; + lastResult = broadcastProcessor.postProcess(taskContext, taskId2ResultMap); + break; + case MAP_REDUCE: + MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor; + lastResult = mapReduceProcessor.reduce(taskContext, taskId2ResultMap); + break; + default: + lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG"); + } + }catch (Exception e) { + lastResult = new ProcessResult(false, e.toString()); + log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e); + } + + TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; + reportStatus(status, lastResult.getMsg()); + + log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); + return; + } + + + // 4. 正式提交运行 + ProcessResult processResult; + try { + processResult = processor.process(taskContext); + }catch (Exception e) { + log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e); + processResult = new ProcessResult(false, e.toString()); + } + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, processResult.getMsg()); } private BasicProcessor getProcessor() { @@ -193,4 +183,15 @@ public class ProcessorRunnable implements Runnable { taskTrackerActor.tell(req, null); } + + @Override + public void run() { + try { + innerRun(); + }catch (Exception e) { + log.error("[ProcessorRunnable-{}] execute failed, please fix this bug @tjq!", task.getInstanceId(), e); + }finally { + ThreadLocalStore.clear(); + } + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java index 8b363fe4..f7cda7de 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ProcessorMapTaskRequest.java @@ -42,7 +42,7 @@ public class ProcessorMapTaskRequest implements Serializable { subTaskList.forEach(subTask -> { // 同一个 Task 内部可能多次 Map,因此还是要确保线程级别的唯一 - String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.TASK_ID_THREAD_LOCAL.get().getAndIncrement(); + String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.getTaskIDAddr().getAndIncrement(); // 写入类名,方便反序列化 subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask))); }); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java index d131b60e..e8a64c6e 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java @@ -49,7 +49,7 @@ public abstract class MapReduceProcessor implements BasicProcessor { log.warn("[MapReduceProcessor] map task size is too large, network maybe overload... please try to split the tasks."); } - TaskDO task = ThreadLocalStore.TASK_THREAD_LOCAL.get(); + TaskDO task = ThreadLocalStore.getTask(); // 1. 构造请求 ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); @@ -74,7 +74,7 @@ public abstract class MapReduceProcessor implements BasicProcessor { } public boolean isRootTask() { - TaskDO task = ThreadLocalStore.TASK_THREAD_LOCAL.get(); + TaskDO task = ThreadLocalStore.getTask(); return TaskConstant.ROOT_TASK_ID.equals(task.getTaskId()); }