From 3528a7724fe086b807fd13a4ff5661adc8b726db Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 8 Jul 2020 15:09:20 +0800 Subject: [PATCH 1/3] [fix] fix the problem of log sync threadpool oversize --- .../common/config/ThreadPoolConfig.java | 24 ++++++++++++++++--- .../server/service/InstanceLogService.java | 8 +------ .../service/instance/InstanceManager.java | 2 +- .../schedule/HashedWheelTimerHolder.java | 4 ++++ 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index 3dc4fd71..fb784a0e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -8,8 +8,7 @@ import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.*; /** * 公用线程池配置 @@ -51,7 +50,19 @@ public class ThreadPoolConfig { executor.setQueueCapacity(1024); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("omsCommonPool-"); - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + executor.setRejectedExecutionHandler(new LogOnRejected()); + return executor; + } + + @Bean("omsBackgroundPool") + public Executor initBackgroundPool() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); + executor.setQueueCapacity(8192); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("omsBackgroundPool-"); + executor.setRejectedExecutionHandler(new LogOnRejected()); return executor; } @@ -65,4 +76,11 @@ public class ThreadPoolConfig { return scheduler; } + private static final class LogOnRejected implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor p) { + log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p); + } + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index 025a5720..6a71ad1f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -170,15 +170,9 @@ public class InstanceLogService { * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ - @Async("omsCommonPool") + @Async("omsBackgroundPool") public void sync(Long instanceId) { - // 休眠10秒等待全部数据上报(OmsLogHandler 每隔5秒上报数据) - try { - TimeUnit.SECONDS.sleep(15); - }catch (Exception ignore) { - } - Stopwatch sw = Stopwatch.createStarted(); try { // 先持久化到本地文件 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index a49e084c..d6938678 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -143,7 +143,7 @@ public class InstanceManager { log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name()); // 上报日志数据 - instanceLogService.sync(instanceId); + HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 15, TimeUnit.SECONDS); // workflow 特殊处理 if (wfInstanceId != null) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java index 193add88..a6667d51 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java @@ -10,8 +10,12 @@ import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTime */ public class HashedWheelTimerHolder { + // 精确时间轮,每 1S 走一格 public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); + // 非精确时间轮,每 5S 走一格 + public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0); + private HashedWheelTimerHolder() { } } From 4e1a447f50358b3b2b4edab1f916101d18c5ad2a Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 8 Jul 2020 17:14:40 +0800 Subject: [PATCH 2/3] [fix] fix the bug of uncorrect statistics of job --- .../powerjob/server/common/config/ThreadPoolConfig.java | 1 + .../server/persistence/core/repository/JobInfoRepository.java | 2 +- .../powerjob/server/web/controller/SystemInfoController.java | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index fb784a0e..f309233d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -14,6 +14,7 @@ import java.util.concurrent.*; * 公用线程池配置 * omsTimingPool:用于执行定时任务的线程池 * omsCommonPool:用于执行普通任务的线程池 + * omsCommonPool:用于执行后台任务的线程池,这类任务对时间不敏感,慢慢执行细水长流即可 * taskScheduler:用于定时调度的线程池 * * @author tjq diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java index 08ae7967..3dbd2254 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java @@ -30,6 +30,6 @@ public interface JobInfoRepository extends JpaRepository { // 校验工作流包含的任务 long countByAppIdAndStatusAndIdIn(Long appId, int status, List jobIds); - long countByAppId(long appId); + long countByAppIdAndStatusNot(long appId, int status); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java index 1a7508ed..86a5be65 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java @@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; @@ -98,7 +99,7 @@ public class SystemInfoController { SystemOverviewVO overview = new SystemOverviewVO(); // 总任务数量 - overview.setJobCount(jobInfoRepository.countByAppId(appId)); + overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV())); // 运行任务数 overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV())); // 近期失败任务数(24H内) From 28299cce9c46070a3033f2a1950f989c6ee47acc Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 8 Jul 2020 23:24:46 +0800 Subject: [PATCH 3/3] [release] v3.1.3-bugfix --- README.md | 2 +- .../powerjob/server/common/config/SwaggerConfig.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 62e8fa8f..c6ea7baf 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为 | 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | | 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | | 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** | -| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | +| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | | DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java index eebbb62c..c574ca59 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java @@ -24,11 +24,11 @@ public class SwaggerConfig { public Docket createRestApi() { // apiInfo()用来创建该Api的基本信息(这些基本信息会展现在文档页面中 ApiInfo apiInfo = new ApiInfoBuilder() - .title("OhMyScheduler") + .title("PowerJob") .description("Distributed scheduling and computing framework.") .license("Apache Licence 2") - .termsOfServiceUrl("https://github.com/KFCFans/OhMyScheduler") - .version("2.0.0") + .termsOfServiceUrl("https://github.com/KFCFans/PowerJob") + .version("3.1.3") .build(); return new Docket(DocumentationType.SWAGGER_2)