fix: Some issues found by codereview

This commit is contained in:
tjq 2024-02-25 21:58:25 +08:00
parent c11d544afe
commit c35ae19ba8
7 changed files with 57 additions and 4 deletions

View File

@ -1,5 +1,8 @@
package tech.powerjob.remote.http;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
@ -58,6 +61,19 @@ public class HttpVertxCSInitializer implements CSInitializer {
@Override
public void init(CSInitializerConfig config) {
this.config = config;
// Vertx 版本升级时必须注意临时解决 vertx 自带的 jackson 序列化无法支持字段升级问题默认特性居然是不支持增删字段的序列化方式外国框架也是一坨...
try {
io.vertx.core.json.jackson.DatabindCodec.mapper()
.configure(com.fasterxml.jackson.databind.MapperFeature.PROPAGATE_TRANSIENT_MARKER, true)
.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
.configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
} catch (Throwable t) {
log.warn("[HttpVertxCSInitializer] hack jackson failed!", t);
}
vertx = VertxInitializer.buildVertx();
httpServer = VertxInitializer.buildHttpServer(vertx);
httpClient = VertxInitializer.buildHttpClient(vertx);

View File

@ -173,7 +173,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
*/
@Data
@AllArgsConstructor
private static class SubTask implements Serializable {
public static class SubTask implements Serializable {
/**
* 再次强调一定要有无参构造方法

View File

@ -185,7 +185,7 @@ public class CommonTaskTracker extends HeavyTaskTracker {
String result = null;
// 2. 如果未完成任务数为0判断是否真正结束并获取真正结束任务的执行结果
if (unfinishedNum == 0) {
if (unfinishedNum <= 0) {
// 数据库中一个任务都没有说明根任务创建失败该任务实例失败
if (finishedNum == 0) {

View File

@ -59,7 +59,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
/**
* 数据库持久化服务
*/
protected TaskPersistenceService taskPersistenceService;
protected final TaskPersistenceService taskPersistenceService;
/**
* 定时任务线程池
*/

View File

@ -23,7 +23,7 @@ import java.util.Optional;
import java.util.function.Consumer;
/**
* desc
* 基于内置数据库的任务持久化服务
*
* @author tjq
* @since 2024/2/23

View File

@ -18,6 +18,10 @@ public class PersistenceServiceManager {
INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.put(instanceId, taskPersistenceService);
}
public static void unregister(Long instanceId) {
INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.remove(instanceId);
}
public static TaskPersistenceService fetchTaskPersistenceService(Long instanceId) {
return INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.get(instanceId);
}

View File

@ -52,6 +52,15 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
private volatile boolean finished = false;
private ExternalTaskPersistenceService externalTaskPersistenceService;
/**
* 保险措施当外部数据长时间空时至少能顺利结束任务而不是一直卡着
*/
private long lastExternalPendingEmptyTime = -1;
private static final long MAX_EXTERNAL_PENDING_WAIT_TIME = 600000;
/**
* 默认最大活跃任务数量
*/
private static final long DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM = 100000;
/**
@ -156,6 +165,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
externalTaskPersistenceService.close();
}
});
PersistenceServiceManager.unregister(instanceId);
return dbTaskPersistenceService.deleteAllTasks(instanceId);
}
@ -230,11 +240,17 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
// 外部存储无数据无需扫描
if (externalPendingRecordNum.sum() <= 0) {
lastExternalPendingEmptyTime = -1;
if (externalPendingRecordNum.sum() < 0) {
log.warn("[SwapTaskPersistenceService-{}] externalPendingRecordNum({}) < 0, maybe there's a bug!", instanceId, externalPendingRecordNum);
}
return;
}
// 到达 DB 最大数量后跳出扫描
if (dbRecordNum.sum() > maxActiveTaskNum) {
// DB为最大数量时说明此时任务依然满载不需要进行空超时统计
lastExternalPendingEmptyTime = -1;
return;
}
@ -242,10 +258,27 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
// 队列空则跳出循环等待下一次扫描
if (CollectionUtils.isEmpty(taskDOS)) {
// 走到此处会满足 DB有可用空间当文件一直空数据如果这个过程长期维持则说明某些地方产生了异常导致判定失准需要及时止损
if (lastExternalPendingEmptyTime < 0) {
lastExternalPendingEmptyTime = System.currentTimeMillis();
}
// 超时机制处理DB 存在可导入空间但长期无法拉到数据同时 externalPendingRecordNum 一直非0导致任务无法判定结束的情况
long offset = System.currentTimeMillis() - lastExternalPendingEmptyTime;
if (offset > MAX_EXTERNAL_PENDING_WAIT_TIME) {
log.warn("[SwapTaskPersistenceService-{}] [moveInPendingTask] Unable to get tasks from external files for a long time, unexpected things may have happened(lastExternalPendingEmptyTime: {}, offsetFromNow: {}). System will reset externalPendingRecordNum so that the task can end(before reset externalPendingRecordNum: {}).", instanceId, lastExternalPendingEmptyTime, offset, externalPendingRecordNum);
externalPendingRecordNum.reset();
return;
}
log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", instanceId);
return;
}
// 一旦读取到数据就重置计时器
lastExternalPendingEmptyTime = -1;
// 一旦读取无论结果如何都直接减数量无论后续结果如何
externalPendingRecordNum.add(-taskDOS.size());