release threadLocal store

This commit is contained in:
tjq 2020-03-28 15:40:17 +08:00
parent be4e41692d
commit 8dc6772767
4 changed files with 122 additions and 100 deletions

View File

@ -6,15 +6,36 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* 存储一些不方便直接传递的东西
* #attention警惕内存泄漏问题最好在 ProcessorTracker destroy 执行 remove
* #attention警惕内存泄漏问题执行完毕后手动释放
*
* @author tjq
* @since 2020/3/18
*/
public class ThreadLocalStore {
public static final ThreadLocal<TaskDO> TASK_THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<TaskDO> TASK_THREAD_LOCAL = new ThreadLocal<>();
public static final ThreadLocal<AtomicLong> TASK_ID_THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<AtomicLong> TASK_ID_THREAD_LOCAL = new ThreadLocal<>();
public static TaskDO getTask() {
return TASK_THREAD_LOCAL.get();
}
public static void setTask(TaskDO task) {
TASK_THREAD_LOCAL.set(task);
}
public static AtomicLong getTaskIDAddr() {
if (TASK_ID_THREAD_LOCAL.get() == null) {
TASK_ID_THREAD_LOCAL.set(new AtomicLong(0));
}
return TASK_ID_THREAD_LOCAL.get();
}
public static void clear() {
TASK_ID_THREAD_LOCAL.remove();
TASK_THREAD_LOCAL.remove();
}
}

View File

@ -21,12 +21,10 @@ import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
import com.github.kfcfans.oms.worker.sdk.api.MapReduceProcessor;
import com.google.common.base.Stopwatch;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* Processor 执行器
@ -39,121 +37,113 @@ import java.util.concurrent.atomic.AtomicLong;
public class ProcessorRunnable implements Runnable {
private InstanceInfo instanceInfo;
private final InstanceInfo instanceInfo;
private final ActorSelection taskTrackerActor;
@Getter
private final TaskDO task;
@Override
public void run() {
public void innerRun() {
String taskId = task.getTaskId();
String instanceId = task.getInstanceId();
log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
try {
// 0. 完成执行上下文准备 & 上报执行信息
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(task, taskContext);
taskContext.setMaxRetryTimes(instanceInfo.getTaskRetryNum());
taskContext.setCurrentRetryTimes(task.getFailedCnt());
taskContext.setJobParams(instanceInfo.getJobParams());
taskContext.setInstanceParams(instanceInfo.getInstanceParams());
if (task.getTaskContent() != null && task.getTaskContent().length > 0) {
taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent()));
}
ThreadLocalStore.TASK_THREAD_LOCAL.set(task);
ThreadLocalStore.TASK_ID_THREAD_LOCAL.set(new AtomicLong(0));
// 0. 完成执行上下文准备 & 上报执行信息
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(task, taskContext);
taskContext.setMaxRetryTimes(instanceInfo.getTaskRetryNum());
taskContext.setCurrentRetryTimes(task.getFailedCnt());
taskContext.setJobParams(instanceInfo.getJobParams());
taskContext.setInstanceParams(instanceInfo.getInstanceParams());
if (task.getTaskContent() != null && task.getTaskContent().length > 0) {
taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent()));
}
ThreadLocalStore.setTask(task);
reportStatus(TaskStatus.WORKER_PROCESSING, null);
reportStatus(TaskStatus.WORKER_PROCESSING, null);
// 1. 获取 Processor
BasicProcessor processor = getProcessor();
if (processor == null) {
reportStatus(TaskStatus.WORKER_PROCESS_FAILED, "NO_PROCESSOR");
return;
}
// 1. 获取 Processor
BasicProcessor processor = getProcessor();
if (processor == null) {
reportStatus(TaskStatus.WORKER_PROCESS_FAILED, "NO_PROCESSOR");
return;
}
// 2. 根任务特殊处理
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (TaskConstant.ROOT_TASK_ID.equals(taskId)) {
// 2. 根任务特殊处理
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (TaskConstant.ROOT_TASK_ID.equals(taskId)) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
spReq.setTaskId(taskId);
spReq.setInstanceId(instanceId);
try {
ProcessResult processResult = broadcastProcessor.preProcess(taskContext);
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(processResult.getMsg());
}catch (Exception e) {
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
}
taskTrackerActor.tell(spReq, null);
// 广播执行的第一个 task 只执行 preProcess 部分
return;
}
}
// 3. 最终任务特殊处理一定和 TaskTracker 处于相同的机器
if (TaskConstant.LAST_TASK_ID.equals(taskId)) {
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
ProcessResult lastResult;
Map<String, String> taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId);
// 去除本任务
taskId2ResultMap.remove(TaskConstant.LAST_TASK_ID);
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
spReq.setTaskId(taskId);
spReq.setInstanceId(instanceId);
try {
switch (executeType) {
case BROADCAST:
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
lastResult = broadcastProcessor.postProcess(taskContext, taskId2ResultMap);
break;
case MAP_REDUCE:
MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
lastResult = mapReduceProcessor.reduce(taskContext, taskId2ResultMap);
break;
default:
lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
}
ProcessResult processResult = broadcastProcessor.preProcess(taskContext);
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(processResult.getMsg());
}catch (Exception e) {
lastResult = new ProcessResult(false, e.toString());
log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
}
TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, lastResult.getMsg());
taskTrackerActor.tell(spReq, null);
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
// 广播执行的第一个 task 只执行 preProcess 部分
return;
}
// 4. 正式提交运行
ProcessResult processResult;
try {
processResult = processor.process(taskContext);
}catch (Exception e) {
log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e);
processResult = new ProcessResult(false, e.toString());
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, processResult.getMsg());
}catch (Exception e) {
log.error("[ProcessorRunnable-{}] execute failed, please fix this bug!", instanceId, e);
}
// 3. 最终任务特殊处理一定和 TaskTracker 处于相同的机器
if (TaskConstant.LAST_TASK_ID.equals(taskId)) {
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
ProcessResult lastResult;
Map<String, String> taskId2ResultMap = TaskPersistenceService.INSTANCE.getTaskId2ResultMap(instanceId);
// 去除本任务
taskId2ResultMap.remove(TaskConstant.LAST_TASK_ID);
try {
switch (executeType) {
case BROADCAST:
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
lastResult = broadcastProcessor.postProcess(taskContext, taskId2ResultMap);
break;
case MAP_REDUCE:
MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
lastResult = mapReduceProcessor.reduce(taskContext, taskId2ResultMap);
break;
default:
lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
}
}catch (Exception e) {
lastResult = new ProcessResult(false, e.toString());
log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
}
TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, lastResult.getMsg());
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
return;
}
// 4. 正式提交运行
ProcessResult processResult;
try {
processResult = processor.process(taskContext);
}catch (Exception e) {
log.warn("[ProcessorRunnable-{}] task({}) process failed.", instanceId, taskContext.getDescription(), e);
processResult = new ProcessResult(false, e.toString());
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, processResult.getMsg());
}
private BasicProcessor getProcessor() {
@ -193,4 +183,15 @@ public class ProcessorRunnable implements Runnable {
taskTrackerActor.tell(req, null);
}
@Override
public void run() {
try {
innerRun();
}catch (Exception e) {
log.error("[ProcessorRunnable-{}] execute failed, please fix this bug @tjq!", task.getInstanceId(), e);
}finally {
ThreadLocalStore.clear();
}
}
}

View File

@ -42,7 +42,7 @@ public class ProcessorMapTaskRequest implements Serializable {
subTaskList.forEach(subTask -> {
// 同一个 Task 内部可能多次 Map因此还是要确保线程级别的唯一
String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.TASK_ID_THREAD_LOCAL.get().getAndIncrement();
String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.getTaskIDAddr().getAndIncrement();
// 写入类名方便反序列化
subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask)));
});

View File

@ -49,7 +49,7 @@ public abstract class MapReduceProcessor implements BasicProcessor {
log.warn("[MapReduceProcessor] map task size is too large, network maybe overload... please try to split the tasks.");
}
TaskDO task = ThreadLocalStore.TASK_THREAD_LOCAL.get();
TaskDO task = ThreadLocalStore.getTask();
// 1. 构造请求
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
@ -74,7 +74,7 @@ public abstract class MapReduceProcessor implements BasicProcessor {
}
public boolean isRootTask() {
TaskDO task = ThreadLocalStore.TASK_THREAD_LOCAL.get();
TaskDO task = ThreadLocalStore.getTask();
return TaskConstant.ROOT_TASK_ID.equals(task.getTaskId());
}