perf: Discarding the results of the map task to improve performance

This commit is contained in:
tjq 2024-02-04 22:17:53 +08:00
parent 9b7c237cf0
commit ff84d46713
2 changed files with 8 additions and 4 deletions

View File

@ -2,6 +2,7 @@ package tech.powerjob.worker.core.tracker.task;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.ServerScheduleJobReq;
@ -33,6 +34,7 @@ public abstract class TaskTracker {
* 任务实例信息 * 任务实例信息
*/ */
protected final InstanceInfo instanceInfo; protected final InstanceInfo instanceInfo;
protected final ExecuteType executeType;
/** /**
* 追加的工作流上下文数据 * 追加的工作流上下文数据
* *
@ -75,6 +77,9 @@ public abstract class TaskTracker {
instanceInfo.setLogConfig(req.getLogConfig()); instanceInfo.setLogConfig(req.getLogConfig());
instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS()); instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS());
// 常用变量初始化
executeType = ExecuteType.valueOf(req.getExecuteType());
// 特殊处理超时时间 // 特殊处理超时时间
if (instanceInfo.getInstanceTimeoutMS() <= 0) { if (instanceInfo.getInstanceTimeoutMS() <= 0) {
instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE); instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE);

View File

@ -83,7 +83,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress()); this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
this.taskPersistenceService = workerRuntime.getTaskPersistenceService(); this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
// 构建缓存 // 构建缓存
taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build(); taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build();
// 构建分段锁 // 构建分段锁
segmentLock = new SegmentLock(UPDATE_CONCURRENCY); segmentLock = new SegmentLock(UPDATE_CONCURRENCY);
@ -226,7 +226,6 @@ public abstract class HeavyTaskTracker extends TaskTracker {
3. 广播任务每台机器都需要执行因此不应该重新分配worker广播任务不应当修改地址 3. 广播任务每台机器都需要执行因此不应该重新分配worker广播任务不应当修改地址
*/ */
String taskName = taskOpt.get().getTaskName(); String taskName = taskOpt.get().getTaskName();
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) { if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
} }
@ -243,8 +242,8 @@ public abstract class HeavyTaskTracker extends TaskTracker {
} }
} }
// 更新状态失败重试写入DB失败的也就不重试了...谁让你那么倒霉呢... // 更新状态失败重试写入DB失败的也就不重试了...谁让你那么倒霉呢...24.2.4 更新大规模 MAP 任务追求极限性能不持久化无用的子任务 result
result = result == null ? "" : result; result = result == null || ExecuteType.MAP.equals(executeType) ? "" : result;
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result); boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
if (!updateResult) { if (!updateResult) {