diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
index bd8d5b3d..cf9819a6 100644
--- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
+++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java
@@ -55,11 +55,13 @@ public class PowerJobDKey {
*/
public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval";
- /* ******************* 不太可能有人用的参数 ******************* */
+ /* ******************* 不太可能有人用的参数,主要方便内部测试 ******************* */
/**
* 最大活跃任务数量,超出部分 SWAP 到磁盘以提升性能
*/
- public static final String WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM = "powerjob.worker.max-active-task-num";
+ public static final String WORKER_RUNTIME_SWAP_MAX_ACTIVE_TASK_NUM = "powerjob.worker.swap.max-active-task-num";
+
+ public static final String WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS = "powerjob.worker.swap.scan-interval";
}
diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java
index f249375f..de3315c5 100644
--- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java
+++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
* @since 2020/4/17
*/
@Slf4j
-@Component("testMapReduceProcessor")
+@Component("demoMapReduceProcessor")
public class MapReduceProcessorDemo implements MapReduceProcessor {
@Override
diff --git a/powerjob-worker-samples/src/main/resources/logback.xml b/powerjob-worker-samples/src/main/resources/logback.xml
index d937c028..66c67b8d 100644
--- a/powerjob-worker-samples/src/main/resources/logback.xml
+++ b/powerjob-worker-samples/src/main/resources/logback.xml
@@ -1,5 +1,10 @@
+
+
+
+
+
@@ -12,16 +17,56 @@
-
-
-
+
+
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${LOG_PATH}/application.log
+
+ %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+ ${LOG_PATH}/application.log.%d{yyyy-MM-dd}.%i
+
+ 500MB
+
+ 7
+
+ 3GB
+
+
+
+
+
+
+ 0
+
+ 256
+
+
+
+
+
+
+
+
+
-
-
-
-
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java
index af28b72d..9a26b85a 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java
@@ -21,14 +21,13 @@ import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.log.OmsLogger;
+import tech.powerjob.worker.persistence.PersistenceServiceManager;
import tech.powerjob.worker.persistence.TaskDO;
+import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
/**
* Processor 执行器
@@ -136,7 +135,8 @@ public class HeavyProcessorRunnable implements Runnable {
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
- List taskResults = workerRuntime.getTaskPersistenceService().getAllTaskResult(instanceId, task.getSubInstanceId());
+ TaskPersistenceService taskPersistenceService = Optional.ofNullable(PersistenceServiceManager.fetchTaskPersistenceService(instanceId)).orElse(workerRuntime.getTaskPersistenceService());
+ List taskResults = taskPersistenceService.getAllTaskResult(instanceId, task.getSubInstanceId());
try {
switch (executeType) {
case BROADCAST:
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
index 41fca7c2..50a4f4ea 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
@@ -470,7 +470,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 2. 没有可用 ProcessorTracker,本次不派发
if (availablePtIps.isEmpty()) {
- log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId);
+ log.warn("[TaskTracker-{}] no available ProcessorTracker now, skip dispatch", instanceId);
return;
}
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java
new file mode 100644
index 00000000..5d5ad1e8
--- /dev/null
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java
@@ -0,0 +1,24 @@
+package tech.powerjob.worker.persistence;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * 持久化器管理
+ *
+ * @author tjq
+ * @since 2024/2/25
+ */
+public class PersistenceServiceManager {
+
+ private static final Map INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE = Maps.newConcurrentMap();
+
+ public static void register(Long instanceId, TaskPersistenceService taskPersistenceService) {
+ INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.put(instanceId, taskPersistenceService);
+ }
+
+ public static TaskPersistenceService fetchTaskPersistenceService(Long instanceId) {
+ return INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.get(instanceId);
+ }
+}
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java
index 13e6a6d3..6f149d14 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java
@@ -32,6 +32,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
private final Long instanceId;
private final long maxActiveTaskNum;
+ private final long scheduleRateMs;
/**
* 数据库记录数量,不要求完全精确,仅用于控制存哪里,有一定容忍度
*/
@@ -63,9 +64,10 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
this.needResult = ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType());
this.canUseSwap = ExecuteType.MAP.name().equalsIgnoreCase(instanceInfo.getExecuteType()) || ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType());
this.dbTaskPersistenceService = dbTaskPersistenceService;
- this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM)));
-
- log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}", instanceId, canUseSwap, needResult, maxActiveTaskNum);
+ this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_SWAP_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM)));
+ this.scheduleRateMs = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS, String.valueOf(DEFAULT_SCHEDULE_TIME)));
+ PersistenceServiceManager.register(this.instanceId, this);
+ log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}, scheduleRateMs: {}", instanceId, canUseSwap, needResult, maxActiveTaskNum, scheduleRateMs);
}
@Override
@@ -139,7 +141,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
externalPendingRecordNum.add(tasks.size());
}
- log.info("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", instanceId, dbNum, persistPendingTaskRes, externalPendingRecordNum);
+ log.debug("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", instanceId, dbNum, persistPendingTaskRes, externalPendingRecordNum);
return persistPendingTaskRes;
} else {
return persistTask2Db(tasks);
@@ -214,10 +216,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
return;
}
- CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME);
+ CommonUtils.easySleep(scheduleRateMs);
- moveInPendingTask();
+ // 顺序很关键,先移出才有空间移入
moveOutFinishedTask();
+ moveInPendingTask();
}
}
@@ -247,7 +250,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
externalPendingRecordNum.add(-taskDOS.size());
boolean persistTask2Db = persistTask2Db(taskDOS);
- log.info("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum);
+ log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum);
// 持久化失败的情况,及时跳出本次循环,防止损失扩大,等待下次扫描
if (!persistTask2Db) {
@@ -261,7 +264,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
while (true) {
- // 一旦启动 SWAP,需要移出更多的数据来灌入,最多允许
+ // 一旦启动 SWAP,需要移出更多的数据来灌入
long maxRemainNum = maxActiveTaskNum / 2;
if (dbRecordNum.sum() <= maxRemainNum) {
return;
@@ -306,10 +309,10 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
if (deleteTasksByTaskIdsResult) {
dbRecordNum.add(-moveOutNum);
- log.info("{} move task to external successfully(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {})", logKey, moveOutNum, externalRecord, dbRecordNum);
+ log.debug("{} move task to external successfully(movedNum: {}, currentExternalSucceedNum: {}, currentExternalFailedNum: {}, currentDbRecordNum: {})", logKey, moveOutNum, externalSucceedRecordNum, externalFailedRecordNum, dbRecordNum);
} else {
// DB 删除失败影响不大,reduce 重复而已
- log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", logKey, moveOutNum, externalRecord, dbRecordNum, deleteTaskIds);
+ log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalSucceedNum: {}, currentExternalFailedNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", logKey, moveOutNum, externalSucceedRecordNum, externalFailedRecordNum, dbRecordNum, deleteTaskIds);
}
}
}
diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java
index 997ca658..e5ec13a5 100644
--- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java
+++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java
@@ -3,7 +3,9 @@ package tech.powerjob.worker.persistence.fs.impl;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.serialize.JsonUtils;
+import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
@@ -55,6 +57,9 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
@Override
public boolean persistPendingTask(List tasks) {
+ if (CollectionUtils.isEmpty(tasks)) {
+ return true;
+ }
try {
String content = JsonUtils.toJSONString(tasks);
pendingFsService.writeLine(content);
@@ -69,15 +74,19 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
@SneakyThrows
public List readPendingTask() {
String pendingTaskStr = pendingFsService.readLine();
- TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class);
- if (taskDOS != null) {
- return Lists.newArrayList(taskDOS);
- }
- return Collections.emptyList();
+ return str2TaskDoList(pendingTaskStr);
}
@Override
public boolean persistFinishedTask(List tasks) {
+
+ if (CollectionUtils.isEmpty(tasks)) {
+ return true;
+ }
+
+ // 移除无用的参数列
+ tasks.forEach(t -> t.setTaskContent(null));
+
try {
String content = JsonUtils.toJSONString(tasks);
resultFsService.writeLine(content);
@@ -91,8 +100,16 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
@Override
@SneakyThrows
public List readFinishedTask() {
- String pendingTaskStr = resultFsService.readLine();
- TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class);
+ String finishedStr = resultFsService.readLine();
+ return str2TaskDoList(finishedStr);
+ }
+
+
+ private static List str2TaskDoList(String finishedStr) throws Exception {
+ if (StringUtils.isEmpty(finishedStr)) {
+ return Collections.emptyList();
+ }
+ TaskDO[] taskDOS = JsonUtils.parseObject(finishedStr, TaskDO[].class);
if (taskDOS != null) {
return Lists.newArrayList(taskDOS);
}