diff --git a/README.md b/README.md index 62e8fa8f..c6ea7baf 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为 | 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | | 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | | 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** | -| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | +| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | | DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java index eebbb62c..c574ca59 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java @@ -24,11 +24,11 @@ public class SwaggerConfig { public Docket createRestApi() { // apiInfo()用来创建该Api的基本信息(这些基本信息会展现在文档页面中 ApiInfo apiInfo = new ApiInfoBuilder() - .title("OhMyScheduler") + .title("PowerJob") .description("Distributed scheduling and computing framework.") .license("Apache Licence 2") - .termsOfServiceUrl("https://github.com/KFCFans/OhMyScheduler") - .version("2.0.0") + .termsOfServiceUrl("https://github.com/KFCFans/PowerJob") + .version("3.1.3") .build(); return new Docket(DocumentationType.SWAGGER_2) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index 3dc4fd71..f309233d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -8,13 +8,13 @@ import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.*; /** * 公用线程池配置 * omsTimingPool:用于执行定时任务的线程池 * omsCommonPool:用于执行普通任务的线程池 + * omsCommonPool:用于执行后台任务的线程池,这类任务对时间不敏感,慢慢执行细水长流即可 * taskScheduler:用于定时调度的线程池 * * @author tjq @@ -51,7 +51,19 @@ public class ThreadPoolConfig { executor.setQueueCapacity(1024); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("omsCommonPool-"); - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + executor.setRejectedExecutionHandler(new LogOnRejected()); + return executor; + } + + @Bean("omsBackgroundPool") + public Executor initBackgroundPool() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors()); + executor.setQueueCapacity(8192); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("omsBackgroundPool-"); + executor.setRejectedExecutionHandler(new LogOnRejected()); return executor; } @@ -65,4 +77,11 @@ public class ThreadPoolConfig { return scheduler; } + private static final class LogOnRejected implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor p) { + log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p); + } + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java index 08ae7967..3dbd2254 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java @@ -30,6 +30,6 @@ public interface JobInfoRepository extends JpaRepository { // 校验工作流包含的任务 long countByAppIdAndStatusAndIdIn(Long appId, int status, List jobIds); - long countByAppId(long appId); + long countByAppIdAndStatusNot(long appId, int status); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index 025a5720..6a71ad1f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -170,15 +170,9 @@ public class InstanceLogService { * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ - @Async("omsCommonPool") + @Async("omsBackgroundPool") public void sync(Long instanceId) { - // 休眠10秒等待全部数据上报(OmsLogHandler 每隔5秒上报数据) - try { - TimeUnit.SECONDS.sleep(15); - }catch (Exception ignore) { - } - Stopwatch sw = Stopwatch.createStarted(); try { // 先持久化到本地文件 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index a49e084c..d6938678 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -143,7 +143,7 @@ public class InstanceManager { log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name()); // 上报日志数据 - instanceLogService.sync(instanceId); + HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 15, TimeUnit.SECONDS); // workflow 特殊处理 if (wfInstanceId != null) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java index 193add88..a6667d51 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/HashedWheelTimerHolder.java @@ -10,8 +10,12 @@ import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTime */ public class HashedWheelTimerHolder { + // 精确时间轮,每 1S 走一格 public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); + // 非精确时间轮,每 5S 走一格 + public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0); + private HashedWheelTimerHolder() { } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java index 1a7508ed..86a5be65 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java @@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; @@ -98,7 +99,7 @@ public class SystemInfoController { SystemOverviewVO overview = new SystemOverviewVO(); // 总任务数量 - overview.setJobCount(jobInfoRepository.countByAppId(appId)); + overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV())); // 运行任务数 overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV())); // 近期失败任务数(24H内)