diff --git a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java index 0480357a..b706186a 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/serialize/JsonUtils.java @@ -1,8 +1,10 @@ package tech.powerjob.common.serialize; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.json.JsonMapper; import lombok.extern.slf4j.Slf4j; @@ -28,8 +30,13 @@ public class JsonUtils { .configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true) .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) .configure(JsonParser.Feature.IGNORE_UNDEFINED, true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .build(); + static { + JSON_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); + } + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference> () {}; private JsonUtils(){ @@ -37,6 +44,9 @@ public class JsonUtils { } public static String toJSONString(Object obj) { + if (obj == null) { + return null; + } if (obj instanceof String) { return (String) obj; } diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java index 2f69fd4b..2c6d7720 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java @@ -25,7 +25,7 @@ import java.util.concurrent.ThreadLocalRandom; * @since 2020/4/17 */ @Slf4j -@Component +@Component("testMapReduceProcessor") public class MapReduceProcessorDemo implements MapReduceProcessor { @Override diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index 004c7d4b..bb354e7a 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -19,7 +19,10 @@ import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.persistence.SwapTaskPersistenceService; import tech.powerjob.worker.persistence.TaskDO; +import tech.powerjob.worker.persistence.TaskPersistenceService; +import tech.powerjob.worker.pojo.model.InstanceInfo; import java.util.List; import java.util.Optional; @@ -51,6 +54,11 @@ public class CommonTaskTracker extends HeavyTaskTracker { super(req, workerRuntime); } + @Override + protected TaskPersistenceService initTaskPersistenceService(InstanceInfo instanceInfo, WorkerRuntime workerRuntime) { + return new SwapTaskPersistenceService(instanceInfo, workerRuntime.getTaskPersistenceService()); + } + @Override protected void initTaskTracker(ServerScheduleJobReq req) { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 5f42c035..47568a4e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -30,6 +30,7 @@ import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; import tech.powerjob.worker.core.tracker.task.TaskTracker; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskPersistenceService; +import tech.powerjob.worker.pojo.model.InstanceInfo; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; @@ -82,7 +83,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 保护性操作 instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency())); this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress()); - this.taskPersistenceService = workerRuntime.getTaskPersistenceService(); + this.taskPersistenceService = initTaskPersistenceService(instanceInfo, workerRuntime); // 构建缓存 taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build(); @@ -95,6 +96,10 @@ public abstract class HeavyTaskTracker extends TaskTracker { log.info("[TaskTracker-{}] create TaskTracker successfully.", instanceId); } + protected TaskPersistenceService initTaskPersistenceService(InstanceInfo instanceInfo, WorkerRuntime workerRuntime) { + return workerRuntime.getTaskPersistenceService(); + } + /** * 静态方法创建 TaskTracker * diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java index 4a410162..c6571bbe 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java @@ -4,6 +4,7 @@ import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.enhance.SafeRunnable; +import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.MapUtils; @@ -11,6 +12,7 @@ import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService; import tech.powerjob.worker.persistence.fs.impl.ExternalTaskFileSystemPersistenceService; +import tech.powerjob.worker.pojo.model.InstanceInfo; import java.util.Collection; import java.util.List; @@ -42,9 +44,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { private final LongAdder externalFailedRecordNum = new LongAdder(); private final boolean needResult; + private final boolean canUseSwap; private final TaskPersistenceService dbTaskPersistenceService; private boolean swapEnabled; + private volatile boolean finished = false; private ExternalTaskPersistenceService externalTaskPersistenceService; private static final long DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM = 100000; @@ -54,11 +58,14 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { */ private static final long DEFAULT_SCHEDULE_TIME = 60000; - public SwapTaskPersistenceService(Long instanceId, boolean needResult, TaskPersistenceService dbTaskPersistenceService) { - this.instanceId = instanceId; - this.needResult = needResult; + public SwapTaskPersistenceService(InstanceInfo instanceInfo, TaskPersistenceService dbTaskPersistenceService) { + this.instanceId = instanceInfo.getInstanceId(); + this.needResult = ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType()); + this.canUseSwap = ExecuteType.MAP.name().equalsIgnoreCase(instanceInfo.getExecuteType()) || ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType()); this.dbTaskPersistenceService = dbTaskPersistenceService; this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM))); + + log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}", instanceId, canUseSwap, needResult, maxActiveTaskNum); } @Override @@ -117,7 +124,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { long dbNum = dbRecordNum.sum(); - if (dbNum > maxActiveTaskNum) { + if (canUseSwap && dbNum > maxActiveTaskNum) { // 上层保证启用 SWAP 的任务,batchSave 的都是等待调度的任务,不会参与真正的运行 boolean persistPendingTaskRes = getExternalTaskPersistenceService().persistPendingTask(tasks); @@ -136,8 +143,9 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { @Override public boolean deleteAllTasks(Long instanceId) { + finished = true; CommonUtils.executeIgnoreException(() -> { - if (externalTaskPersistenceService != null) { + if (swapEnabled) { externalTaskPersistenceService.close(); } }); @@ -195,11 +203,17 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { @Override protected void run0() { + while (true) { - CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME); + if (finished) { + return; + } - moveInPendingTask(); - moveOutFinishedTask(); + CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME); + + moveInPendingTask(); + moveOutFinishedTask(); + } } private void moveInPendingTask() { @@ -220,7 +234,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { // 队列空则跳出循环,等待下一次扫描 if (CollectionUtils.isEmpty(taskDOS)) { - log.debug("[YuGong-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", instanceId); + log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", instanceId); return; } @@ -228,11 +242,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { externalPendingRecordNum.add(-taskDOS.size()); boolean persistTask2Db = persistTask2Db(taskDOS); - log.info("[YuGong-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum); + log.info("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum); // 持久化失败的情况,及时跳出本次循环,防止损失扩大,等待下次扫描 if (!persistTask2Db) { - log.error("[YuGong-{}] [moveInPendingTask] moveIn task failed, these tasks are lost: {}", instanceId, taskDOS); + log.error("[SwapTaskPersistenceService-{}] [moveInPendingTask] moveIn task failed, these tasks are lost: {}", instanceId, taskDOS); return; } } @@ -268,7 +282,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { private void moveOutDetailFinishedTask(List tasks, boolean success) { - String logKey = String.format("[YuGong-%d] [moveOut%sTask] ", instanceId, success ? "Success" : "Failed"); + String logKey = String.format("[SwapTaskPersistenceService-%d] [moveOut%sTask] ", instanceId, success ? "Success" : "Failed"); boolean persistFinishedTask2ExternalResult = getExternalTaskPersistenceService().persistFinishedTask(tasks);