feat: multi stage TimeWhele to imporve schedule performance #110

This commit is contained in:
tjq 2020-11-28 16:10:37 +08:00
parent bc9f18819b
commit f7b543664d
7 changed files with 241 additions and 65 deletions

View File

@ -0,0 +1,58 @@
package com.github.kfcfans.powerjob.server.common;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionHandler;
/**
* 拒绝策略
*
* @author tjq
* @since 2020/11/28
*/
@Slf4j
public class RejectedExecutionHandlerFactory {
/**
* 直接丢弃该任务
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newReject(String source) {
return (r, p) -> {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be dropped!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
};
}
/**
* 调用线程运行
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newCallerRun(String source) {
return (r, p) -> {
log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
if (!p.isShutdown()) {
r.run();
}
};
}
/**
* 新线程运行
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newThreadRun(String source) {
return (r, p) -> {
log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
if (!p.isShutdown()) {
new Thread(r).start();
}
};
}
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.common.config;
import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -33,11 +34,7 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsTimingPool-");
executor.setRejectedExecutionHandler((r, e) -> {
log.warn("[OmsTimingService] timing pool can't schedule job immediately, maybe some job using too much cpu times.");
// 定时任务优先级较高不惜一些代价都需要继续执行开线程继续干
new Thread(r).start();
});
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTimingPool"));
return executor;
}
@ -49,7 +46,7 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-");
executor.setRejectedExecutionHandler(new LogOnRejected());
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newReject("PowerJobBackgroundPool"));
return executor;
}
@ -63,11 +60,4 @@ public class ThreadPoolConfig {
return scheduler;
}
private static final class LogOnRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p);
}
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils.timewheel;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -66,9 +67,9 @@ public class HashedWheelTimer implements Timer {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(16);
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
taskProcessPool = new ThreadPoolExecutor(core, 4 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
}
startTime = System.currentTimeMillis();
@ -172,6 +173,11 @@ public class HashedWheelTimer implements Timer {
removeIf(timerFuture -> {
// processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况
if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
return true;
}
if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
log.warn("[HashedWheelTimer] impossible, please fix the bug");
return true;

View File

@ -18,10 +18,15 @@ public class InstanceTimeWheelService {
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
// 精确时间轮 1S 走一格
// 精确调度时间轮 1MS 走一格
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 非精确调度时间轮用于处理高延迟任务 10S 走一格
private static final HashedWheelTimer SLOW_TIMER = new HashedWheelTimer(10000, 1024, 0);
// 支持取消的时间间隔低于该阈值则不会放进 CARGO
private static final long MIN_INTERVAL_MS = 1000;
// 长延迟阈值
private static final long LONG_DELAY_THRESHOLD_MS = 60000;
/**
* 定时调度
@ -30,13 +35,17 @@ public class InstanceTimeWheelService {
* @param timerTask 需要执行的目标方法
*/
public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
TimerFuture timerFuture = TIMER.schedule(() -> {
CARGO.remove(uniqueId);
timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS);
if (delayMS > MIN_INTERVAL_MS) {
CARGO.put(uniqueId, timerFuture);
if (delayMS <= LONG_DELAY_THRESHOLD_MS) {
realSchedule(uniqueId, delayMS, timerTask);
return;
}
long expectTriggerTime = System.currentTimeMillis() + delayMS;
TimerFuture longDelayTask = SLOW_TIMER.schedule(() -> {
CARGO.remove(uniqueId);
realSchedule(uniqueId, expectTriggerTime - System.currentTimeMillis(), timerTask);
}, delayMS - LONG_DELAY_THRESHOLD_MS, TimeUnit.MILLISECONDS);
CARGO.put(uniqueId, longDelayTask);
}
/**
@ -48,4 +57,15 @@ public class InstanceTimeWheelService {
return CARGO.get(uniqueId);
}
private static void realSchedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
TimerFuture timerFuture = TIMER.schedule(() -> {
CARGO.remove(uniqueId);
timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS);
if (delayMS > MIN_INTERVAL_MS) {
CARGO.put(uniqueId, timerFuture);
}
}
}

View File

@ -0,0 +1,126 @@
package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTimer;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerTask;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 时间轮测试
*
* @author tjq
* @since 2020/11/28
*/
@Slf4j
public class HashedWheelTimerTest {
@Test
public void testHashedWheelTimer() throws Exception {
HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32);
List<TimerFuture> futures = Lists.newLinkedList();
for (int i = 0; i < 1000; i++) {
String name = "Task" + i;
long nowMS = System.currentTimeMillis();
int delayMS = ThreadLocalRandom.current().nextInt(60000);
long targetTime = delayMS + nowMS;
TimerTask timerTask = () -> {
System.out.println("============= " + name + "============= ");
System.out.println("ThreadInfo:" + Thread.currentThread().getName());
System.out.println("expectTime:" + targetTime);;
System.out.println("currentTime:" + System.currentTimeMillis());
System.out.println("deviation:" + (System.currentTimeMillis() - targetTime));
System.out.println("============= " + name + "============= ");
};
futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS));
}
// 随机取消
futures.forEach(future -> {
int x = ThreadLocalRandom.current().nextInt(2);
if (x == 1) {
future.cancel();
}
});
Thread.sleep(1000);
// 关闭
System.out.println(timer.stop().size());
System.out.println("Finished");
Thread.sleep(277777777);
}
@Test
public void testPerformance() throws Exception {
Stopwatch sw = Stopwatch.createStarted();
for (long i = 0; i < 10000000; i++) {
long delay = ThreadLocalRandom.current().nextLong(100, 120000);
long expect = System.currentTimeMillis() + delay;
InstanceTimeWheelService.schedule(i, delay, () -> {
log.info("[Performance] deviation:{}", (System.currentTimeMillis() - expect));
});
}
log.info("[Performance] insert cost: {}", sw);
Thread.sleep(90000);
}
@Test
public void testLongDelayTask() throws Exception {
for (long i = 0; i < 1000000; i++) {
long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 3);
long expect = System.currentTimeMillis() + delay;
InstanceTimeWheelService.schedule(i, delay, () -> {
log.info("[LongDelayTask] deviation: {}", (System.currentTimeMillis() - expect));
});
}
Thread.sleep(60000 * 4);
}
@Test
public void testCancelDelayTask() throws Exception {
AtomicLong executeNum = new AtomicLong();
AtomicLong cancelNum = new AtomicLong();
for (long i = 0; i < 1000000; i++) {
long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 2);
long expect = System.currentTimeMillis() + delay;
InstanceTimeWheelService.schedule(i, delay, () -> {
executeNum.incrementAndGet();
log.info("[CancelLongDelayTask] deviation: {}", (System.currentTimeMillis() - expect));
});
}
Thread.sleep(10000);
for (long i = 0; i < 1000000; i++) {
boolean nextBoolean = ThreadLocalRandom.current().nextBoolean();
if (nextBoolean) {
continue;
}
boolean cancel = InstanceTimeWheelService.fetchTimerFuture(i).cancel();
log.info("[CancelLongDelayTask] id:{},status:{}", i, cancel);
cancelNum.incrementAndGet();
}
Thread.sleep(60000 * 4);
log.info("[CancelLongDelayTask] result -> executeNum:{},cancelNum:{}", executeNum, cancelNum);
}
}

View File

@ -24,49 +24,6 @@ import java.util.stream.Collectors;
*/
public class UtilsTest {
@Test
public void testHashedWheelTimer() throws Exception {
HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32);
List<TimerFuture> futures = Lists.newLinkedList();
for (int i = 0; i < 1000; i++) {
String name = "Task" + i;
long nowMS = System.currentTimeMillis();
int delayMS = ThreadLocalRandom.current().nextInt(60000);
long targetTime = delayMS + nowMS;
TimerTask timerTask = () -> {
System.out.println("============= " + name + "============= ");
System.out.println("ThreadInfo:" + Thread.currentThread().getName());
System.out.println("expectTime:" + targetTime);;
System.out.println("currentTime:" + System.currentTimeMillis());
System.out.println("deviation:" + (System.currentTimeMillis() - targetTime));
System.out.println("============= " + name + "============= ");
};
futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS));
}
// 随机取消
futures.forEach(future -> {
int x = ThreadLocalRandom.current().nextInt(2);
if (x == 1) {
future.cancel();
}
});
Thread.sleep(1000);
// 关闭
System.out.println(timer.stop().size());
System.out.println("Finished");
Thread.sleep(277777777);
}
@Test
public void testCronExpression() throws Exception {
String cron = "0 * * * * ? *";

View File

@ -0,0 +1,19 @@
<?xml version="1.0"?>
<!-- 生产环境日志 -->
<configuration>
<property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %m%n"/>
<!-- Console 输出设置 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
</configuration>