From 2833aae9b6b04cdd18286bacfabe4f0714e51b79 Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 27 May 2020 13:51:28 +0800 Subject: [PATCH] [dev] opt clean strategy --- .../WorkflowInstanceInfoRepository.java | 12 +++++++++++ .../service/instance/InstanceManager.java | 12 +++++++++-- .../server/service/timing/CleanService.java | 21 +++++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInstanceInfoRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInstanceInfoRepository.java index d94bd023..31a39b23 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInstanceInfoRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/repository/WorkflowInstanceInfoRepository.java @@ -2,6 +2,11 @@ package com.github.kfcfans.oms.server.persistence.core.repository; import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; + +import javax.transaction.Transactional; +import java.util.Date; /** * 工作流运行实例数据操作 @@ -10,4 +15,11 @@ import org.springframework.data.jpa.repository.JpaRepository; * @since 2020/5/26 */ public interface WorkflowInstanceInfoRepository extends JpaRepository { + + // 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧... + // 结果只能用 int 接收 + @Modifying + @Transactional + @Query(value = "delete from workflow_instance_info where gmt_modified < ?1", nativeQuery = true) + int deleteAllByGmtModifiedBefore(Date time); } diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java index 7e7bd90f..c71aa45b 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/instance/InstanceManager.java @@ -36,9 +36,9 @@ import java.util.concurrent.TimeUnit; public class InstanceManager { // 存储 instanceId 对应的 Job 信息,便于重试 - private static final Map instanceId2JobInfo = Maps.newConcurrentMap(); + private static Map instanceId2JobInfo = Maps.newConcurrentMap(); // 存储 instance 的状态(暂时只用到了 lastReportTime) - private static final Map instanceId2StatusHolder = Maps.newConcurrentMap(); + private static Map instanceId2StatusHolder = Maps.newConcurrentMap(); // Spring Bean private static DispatchService dispatchService; @@ -206,6 +206,14 @@ public class InstanceManager { return null; } + /** + * 释放本地缓存,防止内存泄漏 + */ + public static void releaseInstanceInfos() { + instanceId2JobInfo = Maps.newConcurrentMap(); + instanceId2StatusHolder = Maps.newConcurrentMap(); + } + private static InstanceInfoRepository getInstanceInfoRepository() { while (instanceInfoRepository == null) { try { diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/CleanService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/CleanService.java index cdbefbb4..d3db0aae 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/CleanService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/timing/CleanService.java @@ -2,8 +2,10 @@ package com.github.kfcfans.oms.server.service.timing; import com.github.kfcfans.oms.server.common.utils.OmsFileUtils; import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository; +import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.oms.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.oms.server.service.ha.WorkerManagerService; +import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import lombok.extern.slf4j.Slf4j; @@ -31,6 +33,8 @@ public class CleanService { private GridFsManager gridFsManager; @Resource private InstanceInfoRepository instanceInfoRepository; + @Resource + private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; @Value("${oms.log.retention.local}") private int localLogRetentionDay; @@ -54,14 +58,20 @@ public class CleanService { @Scheduled(cron = CLEAN_TIME_EXPRESSION) public void timingClean() { + // 释放本地缓存 WorkerManagerService.releaseContainerInfos(); + InstanceManager.releaseInstanceInfos(); + // 删除数据库运行记录 cleanInstanceLog(); + cleanWorkflowInstanceLog(); + // 释放磁盘空间 cleanLocal(OmsFileUtils.genLogDirPath(), localLogRetentionDay); cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay); cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY); + // 删除 GridFS 过期文件 cleanRemote(GridFsManager.LOG_BUCKET, remoteLogRetentionDay); cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); } @@ -127,4 +137,15 @@ public class CleanService { } } + @VisibleForTesting + public void cleanWorkflowInstanceLog() { + try { + Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay); + int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(t); + log.info("[CleanService] deleted {} workflow instanceInfo records whose modify time before {}.", num, t); + }catch (Exception e) { + log.warn("[CleanService] clean workflow instanceInfo failed.", e); + } + } + }