[release] Merge branch 'v3.1.3-bugfix'

This commit is contained in:
tjq 2020-07-08 23:26:04 +08:00
commit 9ac3929a8e
8 changed files with 35 additions and 17 deletions

View File

@ -48,7 +48,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | | 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | | 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
| 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** | | 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** |
| 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** | | 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | | DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |

View File

@ -24,11 +24,11 @@ public class SwaggerConfig {
public Docket createRestApi() { public Docket createRestApi() {
// apiInfo()用来创建该Api的基本信息这些基本信息会展现在文档页面中 // apiInfo()用来创建该Api的基本信息这些基本信息会展现在文档页面中
ApiInfo apiInfo = new ApiInfoBuilder() ApiInfo apiInfo = new ApiInfoBuilder()
.title("OhMyScheduler") .title("PowerJob")
.description("Distributed scheduling and computing framework.") .description("Distributed scheduling and computing framework.")
.license("Apache Licence 2") .license("Apache Licence 2")
.termsOfServiceUrl("https://github.com/KFCFans/OhMyScheduler") .termsOfServiceUrl("https://github.com/KFCFans/PowerJob")
.version("2.0.0") .version("3.1.3")
.build(); .build();
return new Docket(DocumentationType.SWAGGER_2) return new Docket(DocumentationType.SWAGGER_2)

View File

@ -8,13 +8,13 @@ import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.concurrent.Executor; import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* 公用线程池配置 * 公用线程池配置
* omsTimingPool用于执行定时任务的线程池 * omsTimingPool用于执行定时任务的线程池
* omsCommonPool用于执行普通任务的线程池 * omsCommonPool用于执行普通任务的线程池
* omsCommonPool用于执行后台任务的线程池这类任务对时间不敏感慢慢执行细水长流即可
* taskScheduler用于定时调度的线程池 * taskScheduler用于定时调度的线程池
* *
* @author tjq * @author tjq
@ -51,7 +51,19 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(1024); executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60); executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsCommonPool-"); executor.setThreadNamePrefix("omsCommonPool-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.setRejectedExecutionHandler(new LogOnRejected());
return executor;
}
@Bean("omsBackgroundPool")
public Executor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-");
executor.setRejectedExecutionHandler(new LogOnRejected());
return executor; return executor;
} }
@ -65,4 +77,11 @@ public class ThreadPoolConfig {
return scheduler; return scheduler;
} }
private static final class LogOnRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p);
}
}
} }

View File

@ -30,6 +30,6 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
// 校验工作流包含的任务 // 校验工作流包含的任务
long countByAppIdAndStatusAndIdIn(Long appId, int status, List<Long> jobIds); long countByAppIdAndStatusAndIdIn(Long appId, int status, List<Long> jobIds);
long countByAppId(long appId); long countByAppIdAndStatusNot(long appId, int status);
} }

View File

@ -170,15 +170,9 @@ public class InstanceLogService {
* 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行 * 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
*/ */
@Async("omsCommonPool") @Async("omsBackgroundPool")
public void sync(Long instanceId) { public void sync(Long instanceId) {
// 休眠10秒等待全部数据上报OmsLogHandler 每隔5秒上报数据
try {
TimeUnit.SECONDS.sleep(15);
}catch (Exception ignore) {
}
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
try { try {
// 先持久化到本地文件 // 先持久化到本地文件

View File

@ -143,7 +143,7 @@ public class InstanceManager {
log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name()); log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());
// 上报日志数据 // 上报日志数据
instanceLogService.sync(instanceId); HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 15, TimeUnit.SECONDS);
// workflow 特殊处理 // workflow 特殊处理
if (wfInstanceId != null) { if (wfInstanceId != null) {

View File

@ -10,8 +10,12 @@ import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTime
*/ */
public class HashedWheelTimerHolder { public class HashedWheelTimerHolder {
// 精确时间轮 1S 走一格
public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 非精确时间轮 5S 走一格
public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0);
private HashedWheelTimerHolder() { private HashedWheelTimerHolder() {
} }
} }

View File

@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
@ -98,7 +99,7 @@ public class SystemInfoController {
SystemOverviewVO overview = new SystemOverviewVO(); SystemOverviewVO overview = new SystemOverviewVO();
// 总任务数量 // 总任务数量
overview.setJobCount(jobInfoRepository.countByAppId(appId)); overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV()));
// 运行任务数 // 运行任务数
overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV())); overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));
// 近期失败任务数24H内 // 近期失败任务数24H内