Merge branch 'v3.1.3-bugfix' into jenkins_auto_build

This commit is contained in:
tjq 2020-07-08 16:55:44 +08:00
commit 9012ae18df
4 changed files with 27 additions and 11 deletions

View File

@ -8,8 +8,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.*;
/**
* 公用线程池配置
@ -51,7 +50,19 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsCommonPool-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setRejectedExecutionHandler(new LogOnRejected());
return executor;
}
@Bean("omsBackgroundPool")
public Executor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-");
executor.setRejectedExecutionHandler(new LogOnRejected());
return executor;
}
@ -65,4 +76,11 @@ public class ThreadPoolConfig {
return scheduler;
}
private static final class LogOnRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p);
}
}
}

View File

@ -170,15 +170,9 @@ public class InstanceLogService {
* 将本地的任务实例运行日志同步到 mongoDB 存储在任务执行结束后异步执行
* @param instanceId 任务实例ID
*/
@Async("omsCommonPool")
@Async("omsBackgroundPool")
public void sync(Long instanceId) {
// 休眠10秒等待全部数据上报OmsLogHandler 每隔5秒上报数据
try {
TimeUnit.SECONDS.sleep(15);
}catch (Exception ignore) {
}
Stopwatch sw = Stopwatch.createStarted();
try {
// 先持久化到本地文件

View File

@ -143,7 +143,7 @@ public class InstanceManager {
log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());
// 上报日志数据
instanceLogService.sync(instanceId);
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 15, TimeUnit.SECONDS);
// workflow 特殊处理
if (wfInstanceId != null) {

View File

@ -10,8 +10,12 @@ import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTime
*/
public class HashedWheelTimerHolder {
// 精确时间轮 1S 走一格
public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 非精确时间轮 5S 走一格
public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0);
private HashedWheelTimerHolder() {
}
}