From e64ad0f74d368e30ce4be75cbafdbbda821207e4 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 25 Feb 2024 19:13:03 +0800 Subject: [PATCH] fix: Loss of subtask data when mapreduce enters swap mode --- .../tech/powerjob/common/PowerJobDKey.java | 6 +- .../processors/MapReduceProcessorDemo.java | 2 +- .../src/main/resources/logback.xml | 65 ++++++++++++++++--- .../runnable/HeavyProcessorRunnable.java | 10 +-- .../tracker/task/heavy/HeavyTaskTracker.java | 2 +- .../PersistenceServiceManager.java | 24 +++++++ .../SwapTaskPersistenceService.java | 23 ++++--- ...ernalTaskFileSystemPersistenceService.java | 31 +++++++-- 8 files changed, 127 insertions(+), 36 deletions(-) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java index bd8d5b3d..cf9819a6 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java @@ -55,11 +55,13 @@ public class PowerJobDKey { */ public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval"; - /* ******************* 不太可能有人用的参数 ******************* */ + /* ******************* 不太可能有人用的参数,主要方便内部测试 ******************* */ /** * 最大活跃任务数量,超出部分 SWAP 到磁盘以提升性能 */ - public static final String WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM = "powerjob.worker.max-active-task-num"; + public static final String WORKER_RUNTIME_SWAP_MAX_ACTIVE_TASK_NUM = "powerjob.worker.swap.max-active-task-num"; + + public static final String WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS = "powerjob.worker.swap.scan-interval"; } diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java index f249375f..de3315c5 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; * @since 2020/4/17 */ @Slf4j -@Component("testMapReduceProcessor") +@Component("demoMapReduceProcessor") public class MapReduceProcessorDemo implements MapReduceProcessor { @Override diff --git a/powerjob-worker-samples/src/main/resources/logback.xml b/powerjob-worker-samples/src/main/resources/logback.xml index d937c028..66c67b8d 100644 --- a/powerjob-worker-samples/src/main/resources/logback.xml +++ b/powerjob-worker-samples/src/main/resources/logback.xml @@ -1,5 +1,10 @@ + + + + + @@ -12,16 +17,56 @@ - - - + + + + + - - - + + + + + + + + + + + + + + ${LOG_PATH}/application.log + + %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n + + + + + ${LOG_PATH}/application.log.%d{yyyy-MM-dd}.%i + + 500MB + + 7 + + 3GB + + + + + + + 0 + + 256 + + + + + + + + + - - - - diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java index af28b72d..9a26b85a 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java @@ -21,14 +21,13 @@ import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor; import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; import tech.powerjob.worker.extension.processor.ProcessorBean; import tech.powerjob.worker.log.OmsLogger; +import tech.powerjob.worker.persistence.PersistenceServiceManager; import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.TaskPersistenceService; import tech.powerjob.worker.pojo.model.InstanceInfo; import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.util.*; /** * Processor 执行器 @@ -136,7 +135,8 @@ public class HeavyProcessorRunnable implements Runnable { Stopwatch stopwatch = Stopwatch.createStarted(); log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); - List taskResults = workerRuntime.getTaskPersistenceService().getAllTaskResult(instanceId, task.getSubInstanceId()); + TaskPersistenceService taskPersistenceService = Optional.ofNullable(PersistenceServiceManager.fetchTaskPersistenceService(instanceId)).orElse(workerRuntime.getTaskPersistenceService()); + List taskResults = taskPersistenceService.getAllTaskResult(instanceId, task.getSubInstanceId()); try { switch (executeType) { case BROADCAST: diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 41fca7c2..50a4f4ea 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -470,7 +470,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 2. 没有可用 ProcessorTracker,本次不派发 if (availablePtIps.isEmpty()) { - log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId); + log.warn("[TaskTracker-{}] no available ProcessorTracker now, skip dispatch", instanceId); return; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java new file mode 100644 index 00000000..5d5ad1e8 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java @@ -0,0 +1,24 @@ +package tech.powerjob.worker.persistence; + +import com.google.common.collect.Maps; + +import java.util.Map; + +/** + * 持久化器管理 + * + * @author tjq + * @since 2024/2/25 + */ +public class PersistenceServiceManager { + + private static final Map INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE = Maps.newConcurrentMap(); + + public static void register(Long instanceId, TaskPersistenceService taskPersistenceService) { + INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.put(instanceId, taskPersistenceService); + } + + public static TaskPersistenceService fetchTaskPersistenceService(Long instanceId) { + return INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.get(instanceId); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java index 13e6a6d3..6f149d14 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java @@ -32,6 +32,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { private final Long instanceId; private final long maxActiveTaskNum; + private final long scheduleRateMs; /** * 数据库记录数量,不要求完全精确,仅用于控制存哪里,有一定容忍度 */ @@ -63,9 +64,10 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { this.needResult = ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType()); this.canUseSwap = ExecuteType.MAP.name().equalsIgnoreCase(instanceInfo.getExecuteType()) || ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType()); this.dbTaskPersistenceService = dbTaskPersistenceService; - this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM))); - - log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}", instanceId, canUseSwap, needResult, maxActiveTaskNum); + this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_SWAP_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM))); + this.scheduleRateMs = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS, String.valueOf(DEFAULT_SCHEDULE_TIME))); + PersistenceServiceManager.register(this.instanceId, this); + log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}, scheduleRateMs: {}", instanceId, canUseSwap, needResult, maxActiveTaskNum, scheduleRateMs); } @Override @@ -139,7 +141,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { externalPendingRecordNum.add(tasks.size()); } - log.info("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", instanceId, dbNum, persistPendingTaskRes, externalPendingRecordNum); + log.debug("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", instanceId, dbNum, persistPendingTaskRes, externalPendingRecordNum); return persistPendingTaskRes; } else { return persistTask2Db(tasks); @@ -214,10 +216,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { return; } - CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME); + CommonUtils.easySleep(scheduleRateMs); - moveInPendingTask(); + // 顺序很关键,先移出才有空间移入 moveOutFinishedTask(); + moveInPendingTask(); } } @@ -247,7 +250,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { externalPendingRecordNum.add(-taskDOS.size()); boolean persistTask2Db = persistTask2Db(taskDOS); - log.info("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum); + log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum); // 持久化失败的情况,及时跳出本次循环,防止损失扩大,等待下次扫描 if (!persistTask2Db) { @@ -261,7 +264,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { while (true) { - // 一旦启动 SWAP,需要移出更多的数据来灌入,最多允许 + // 一旦启动 SWAP,需要移出更多的数据来灌入 long maxRemainNum = maxActiveTaskNum / 2; if (dbRecordNum.sum() <= maxRemainNum) { return; @@ -306,10 +309,10 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { if (deleteTasksByTaskIdsResult) { dbRecordNum.add(-moveOutNum); - log.info("{} move task to external successfully(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {})", logKey, moveOutNum, externalRecord, dbRecordNum); + log.debug("{} move task to external successfully(movedNum: {}, currentExternalSucceedNum: {}, currentExternalFailedNum: {}, currentDbRecordNum: {})", logKey, moveOutNum, externalSucceedRecordNum, externalFailedRecordNum, dbRecordNum); } else { // DB 删除失败影响不大,reduce 重复而已 - log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", logKey, moveOutNum, externalRecord, dbRecordNum, deleteTaskIds); + log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalSucceedNum: {}, currentExternalFailedNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", logKey, moveOutNum, externalSucceedRecordNum, externalFailedRecordNum, dbRecordNum, deleteTaskIds); } } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java index 997ca658..e5ec13a5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java @@ -3,7 +3,9 @@ package tech.powerjob.worker.persistence.fs.impl; import com.google.common.collect.Lists; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService; @@ -55,6 +57,9 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer @Override public boolean persistPendingTask(List tasks) { + if (CollectionUtils.isEmpty(tasks)) { + return true; + } try { String content = JsonUtils.toJSONString(tasks); pendingFsService.writeLine(content); @@ -69,15 +74,19 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer @SneakyThrows public List readPendingTask() { String pendingTaskStr = pendingFsService.readLine(); - TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class); - if (taskDOS != null) { - return Lists.newArrayList(taskDOS); - } - return Collections.emptyList(); + return str2TaskDoList(pendingTaskStr); } @Override public boolean persistFinishedTask(List tasks) { + + if (CollectionUtils.isEmpty(tasks)) { + return true; + } + + // 移除无用的参数列 + tasks.forEach(t -> t.setTaskContent(null)); + try { String content = JsonUtils.toJSONString(tasks); resultFsService.writeLine(content); @@ -91,8 +100,16 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer @Override @SneakyThrows public List readFinishedTask() { - String pendingTaskStr = resultFsService.readLine(); - TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class); + String finishedStr = resultFsService.readLine(); + return str2TaskDoList(finishedStr); + } + + + private static List str2TaskDoList(String finishedStr) throws Exception { + if (StringUtils.isEmpty(finishedStr)) { + return Collections.emptyList(); + } + TaskDO[] taskDOS = JsonUtils.parseObject(finishedStr, TaskDO[].class); if (taskDOS != null) { return Lists.newArrayList(taskDOS); }