From ff84d467132d042ddc94aaceb2d33c6be83ebd90 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 4 Feb 2024 22:17:53 +0800 Subject: [PATCH] perf: Discarding the results of the map task to improve performance --- .../powerjob/worker/core/tracker/task/TaskTracker.java | 5 +++++ .../worker/core/tracker/task/heavy/HeavyTaskTracker.java | 7 +++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index 58a6bc3a..dd398485 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -2,6 +2,7 @@ package tech.powerjob.worker.core.tracker.task; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; @@ -33,6 +34,7 @@ public abstract class TaskTracker { * 任务实例信息 */ protected final InstanceInfo instanceInfo; + protected final ExecuteType executeType; /** * 追加的工作流上下文数据 * @@ -75,6 +77,9 @@ public abstract class TaskTracker { instanceInfo.setLogConfig(req.getLogConfig()); instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS()); + // 常用变量初始化 + executeType = ExecuteType.valueOf(req.getExecuteType()); + // 特殊处理超时时间 if (instanceInfo.getInstanceTimeoutMS() <= 0) { instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 4f75e0ab..99f449e6 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -83,7 +83,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress()); this.taskPersistenceService = workerRuntime.getTaskPersistenceService(); // 构建缓存 - taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build(); + taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build(); // 构建分段锁 segmentLock = new SegmentLock(UPDATE_CONCURRENCY); @@ -226,7 +226,6 @@ public abstract class HeavyTaskTracker extends TaskTracker { 3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址) */ String taskName = taskOpt.get().getTaskName(); - ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) { updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); } @@ -243,8 +242,8 @@ public abstract class HeavyTaskTracker extends TaskTracker { } } - // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...) - result = result == null ? "" : result; + // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)(24.2.4 更新:大规模 MAP 任务追求极限性能,不持久化无用的子任务 result) + result = result == null || ExecuteType.MAP.equals(executeType) ? "" : result; boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result); if (!updateResult) {