From c35ae19ba8caeee8ef227e6db545de67dce9467c Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 25 Feb 2024 21:58:25 +0800 Subject: [PATCH] fix: Some issues found by codereview --- .../remote/http/HttpVertxCSInitializer.java | 16 +++++++++ .../processors/MapReduceProcessorDemo.java | 2 +- .../tracker/task/heavy/CommonTaskTracker.java | 2 +- .../tracker/task/heavy/HeavyTaskTracker.java | 2 +- .../persistence/DbTaskPersistenceService.java | 2 +- .../PersistenceServiceManager.java | 4 +++ .../SwapTaskPersistenceService.java | 33 +++++++++++++++++++ 7 files changed, 57 insertions(+), 4 deletions(-) diff --git a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java index 9d37a6e8..34117c44 100644 --- a/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java +++ b/powerjob-remote/powerjob-remote-impl-http/src/main/java/tech/powerjob/remote/http/HttpVertxCSInitializer.java @@ -1,5 +1,8 @@ package tech.powerjob.remote.http; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationFeature; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -58,6 +61,19 @@ public class HttpVertxCSInitializer implements CSInitializer { @Override public void init(CSInitializerConfig config) { this.config = config; + + // 【Vertx 版本升级时必须注意】临时解决 vertx 自带的 jackson 序列化无法支持字段升级问题(默认特性居然是不支持增删字段的序列化方式,外国框架也是一坨...) + try { + io.vertx.core.json.jackson.DatabindCodec.mapper() + .configure(com.fasterxml.jackson.databind.MapperFeature.PROPAGATE_TRANSIENT_MARKER, true) + .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) + .configure(JsonParser.Feature.IGNORE_UNDEFINED, true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + } catch (Throwable t) { + log.warn("[HttpVertxCSInitializer] hack jackson failed!", t); + } + vertx = VertxInitializer.buildVertx(); httpServer = VertxInitializer.buildHttpServer(vertx); httpClient = VertxInitializer.buildHttpClient(vertx); diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java index de3315c5..d2cec295 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java @@ -173,7 +173,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor { */ @Data @AllArgsConstructor - private static class SubTask implements Serializable { + public static class SubTask implements Serializable { /** * 再次强调,一定要有无参构造方法 diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index 1553d2cd..2ad7ac80 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -185,7 +185,7 @@ public class CommonTaskTracker extends HeavyTaskTracker { String result = null; // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果 - if (unfinishedNum == 0) { + if (unfinishedNum <= 0) { // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败 if (finishedNum == 0) { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 50a4f4ea..e2d383cd 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -59,7 +59,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { /** * 数据库持久化服务 */ - protected TaskPersistenceService taskPersistenceService; + protected final TaskPersistenceService taskPersistenceService; /** * 定时任务线程池 */ diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java index 4e3e94ed..fad8ae37 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/DbTaskPersistenceService.java @@ -23,7 +23,7 @@ import java.util.Optional; import java.util.function.Consumer; /** - * desc + * 基于内置数据库的任务持久化服务 * * @author tjq * @since 2024/2/23 diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java index 5d5ad1e8..ebe8d585 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java @@ -18,6 +18,10 @@ public class PersistenceServiceManager { INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.put(instanceId, taskPersistenceService); } + public static void unregister(Long instanceId) { + INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.remove(instanceId); + } + public static TaskPersistenceService fetchTaskPersistenceService(Long instanceId) { return INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.get(instanceId); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java index 6f149d14..54930391 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java @@ -52,6 +52,15 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { private volatile boolean finished = false; private ExternalTaskPersistenceService externalTaskPersistenceService; + /** + * 保险措施,当外部数据长时间空时,至少能顺利结束任务,而不是一直卡着 + */ + private long lastExternalPendingEmptyTime = -1; + private static final long MAX_EXTERNAL_PENDING_WAIT_TIME = 600000; + + /** + * 默认最大活跃任务数量 + */ private static final long DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM = 100000; /** @@ -156,6 +165,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { externalTaskPersistenceService.close(); } }); + PersistenceServiceManager.unregister(instanceId); return dbTaskPersistenceService.deleteAllTasks(instanceId); } @@ -230,11 +240,17 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { // 外部存储无数据,无需扫描 if (externalPendingRecordNum.sum() <= 0) { + lastExternalPendingEmptyTime = -1; + if (externalPendingRecordNum.sum() < 0) { + log.warn("[SwapTaskPersistenceService-{}] externalPendingRecordNum({}) < 0, maybe there's a bug!", instanceId, externalPendingRecordNum); + } return; } // 到达 DB 最大数量后跳出扫描 if (dbRecordNum.sum() > maxActiveTaskNum) { + // DB为最大数量时,说明此时任务依然满载,不需要进行空超时统计 + lastExternalPendingEmptyTime = -1; return; } @@ -242,10 +258,27 @@ public class SwapTaskPersistenceService implements TaskPersistenceService { // 队列空则跳出循环,等待下一次扫描 if (CollectionUtils.isEmpty(taskDOS)) { + + // 走到此处,会满足 DB有可用空间,当文件一直空数据。如果这个过程长期维持,则说明某些地方产生了异常导致判定失准,需要及时止损 + if (lastExternalPendingEmptyTime < 0) { + lastExternalPendingEmptyTime = System.currentTimeMillis(); + } + + // 超时机制,处理:DB 存在可导入空间但长期无法拉到数据,同时 externalPendingRecordNum 一直非0导致任务无法判定结束的情况 + long offset = System.currentTimeMillis() - lastExternalPendingEmptyTime; + if (offset > MAX_EXTERNAL_PENDING_WAIT_TIME) { + log.warn("[SwapTaskPersistenceService-{}] [moveInPendingTask] Unable to get tasks from external files for a long time, unexpected things may have happened(lastExternalPendingEmptyTime: {}, offsetFromNow: {}). System will reset externalPendingRecordNum so that the task can end(before reset externalPendingRecordNum: {}).", instanceId, lastExternalPendingEmptyTime, offset, externalPendingRecordNum); + externalPendingRecordNum.reset(); + return; + } + log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", instanceId); return; } + // 一旦读取到数据就重置计时器 + lastExternalPendingEmptyTime = -1; + // 一旦读取,无论结果如何都直接减数量,无论后续结果如何 externalPendingRecordNum.add(-taskDOS.size());