From f7b543664d3f10983ff72363e1ca2ced043dcf5f Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 28 Nov 2020 16:10:37 +0800 Subject: [PATCH] feat: multi stage TimeWhele to imporve schedule performance #110 --- .../RejectedExecutionHandlerFactory.java | 58 ++++++++ .../common/config/ThreadPoolConfig.java | 16 +-- .../utils/timewheel/HashedWheelTimer.java | 10 +- .../instance/InstanceTimeWheelService.java | 34 ++++- .../server/test/HashedWheelTimerTest.java | 126 ++++++++++++++++++ .../powerjob/server/test/UtilsTest.java | 43 ------ .../src/test/resources/logback-test.xml | 19 +++ 7 files changed, 241 insertions(+), 65 deletions(-) create mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/RejectedExecutionHandlerFactory.java create mode 100644 powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/HashedWheelTimerTest.java create mode 100644 powerjob-server/src/test/resources/logback-test.xml diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/RejectedExecutionHandlerFactory.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/RejectedExecutionHandlerFactory.java new file mode 100644 index 00000000..691dee87 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/RejectedExecutionHandlerFactory.java @@ -0,0 +1,58 @@ +package com.github.kfcfans.powerjob.server.common; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.RejectedExecutionHandler; + +/** + * 拒绝策略 + * + * @author tjq + * @since 2020/11/28 + */ +@Slf4j +public class RejectedExecutionHandlerFactory { + + /** + * 直接丢弃该任务 + * @param source log name + * @return A handler for tasks that cannot be executed by ThreadPool + */ + public static RejectedExecutionHandler newReject(String source) { + return (r, p) -> { + log.error("[{}] ThreadPool[{}] overload, the task[{}] will be dropped!", source, p, r); + log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source); + }; + } + + /** + * 调用线程运行 + * @param source log name + * @return A handler for tasks that cannot be executed by ThreadPool + */ + public static RejectedExecutionHandler newCallerRun(String source) { + return (r, p) -> { + log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread!", source, p, r); + log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source); + if (!p.isShutdown()) { + r.run(); + } + }; + } + + /** + * 新线程运行 + * @param source log name + * @return A handler for tasks that cannot be executed by ThreadPool + */ + public static RejectedExecutionHandler newThreadRun(String source) { + return (r, p) -> { + log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!", source, p, r); + log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source); + if (!p.isShutdown()) { + new Thread(r).start(); + } + }; + } + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index 5597eaf5..5762f0ce 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.common.config; +import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -33,11 +34,7 @@ public class ThreadPoolConfig { executor.setQueueCapacity(0); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("omsTimingPool-"); - executor.setRejectedExecutionHandler((r, e) -> { - log.warn("[OmsTimingService] timing pool can't schedule job immediately, maybe some job using too much cpu times."); - // 定时任务优先级较高,不惜一些代价都需要继续执行,开线程继续干~ - new Thread(r).start(); - }); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTimingPool")); return executor; } @@ -49,7 +46,7 @@ public class ThreadPoolConfig { executor.setQueueCapacity(8192); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("omsBackgroundPool-"); - executor.setRejectedExecutionHandler(new LogOnRejected()); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newReject("PowerJobBackgroundPool")); return executor; } @@ -63,11 +60,4 @@ public class ThreadPoolConfig { return scheduler; } - private static final class LogOnRejected implements RejectedExecutionHandler { - - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor p) { - log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p); - } - } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java index 847966a1..241bbe5d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.common.utils.timewheel; import com.github.kfcfans.powerjob.common.utils.CommonUtils; +import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory; import com.google.common.collect.Queues; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -66,9 +67,9 @@ public class HashedWheelTimer implements Timer { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build(); BlockingQueue queue = Queues.newLinkedBlockingQueue(16); int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum); - taskProcessPool = new ThreadPoolExecutor(core, 2 * core, + taskProcessPool = new ThreadPoolExecutor(core, 4 * core, 60, TimeUnit.SECONDS, - queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); + queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool")); } startTime = System.currentTimeMillis(); @@ -172,6 +173,11 @@ public class HashedWheelTimer implements Timer { removeIf(timerFuture -> { + // processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况 + if (timerFuture.status == HashedWheelTimerFuture.CANCELED) { + return true; + } + if (timerFuture.status != HashedWheelTimerFuture.WAITING) { log.warn("[HashedWheelTimer] impossible, please fix the bug"); return true; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java index 10a0c9d3..318e215e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java @@ -18,10 +18,15 @@ public class InstanceTimeWheelService { private static final Map CARGO = Maps.newConcurrentMap(); - // 精确时间轮,每 1S 走一格 + // 精确调度时间轮,每 1MS 走一格 private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); + // 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格 + private static final HashedWheelTimer SLOW_TIMER = new HashedWheelTimer(10000, 1024, 0); + // 支持取消的时间间隔,低于该阈值则不会放进 CARGO private static final long MIN_INTERVAL_MS = 1000; + // 长延迟阈值 + private static final long LONG_DELAY_THRESHOLD_MS = 60000; /** * 定时调度 @@ -30,13 +35,17 @@ public class InstanceTimeWheelService { * @param timerTask 需要执行的目标方法 */ public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) { - TimerFuture timerFuture = TIMER.schedule(() -> { - CARGO.remove(uniqueId); - timerTask.run(); - }, delayMS, TimeUnit.MILLISECONDS); - if (delayMS > MIN_INTERVAL_MS) { - CARGO.put(uniqueId, timerFuture); + if (delayMS <= LONG_DELAY_THRESHOLD_MS) { + realSchedule(uniqueId, delayMS, timerTask); + return; } + + long expectTriggerTime = System.currentTimeMillis() + delayMS; + TimerFuture longDelayTask = SLOW_TIMER.schedule(() -> { + CARGO.remove(uniqueId); + realSchedule(uniqueId, expectTriggerTime - System.currentTimeMillis(), timerTask); + }, delayMS - LONG_DELAY_THRESHOLD_MS, TimeUnit.MILLISECONDS); + CARGO.put(uniqueId, longDelayTask); } /** @@ -48,4 +57,15 @@ public class InstanceTimeWheelService { return CARGO.get(uniqueId); } + + private static void realSchedule(Long uniqueId, Long delayMS, TimerTask timerTask) { + TimerFuture timerFuture = TIMER.schedule(() -> { + CARGO.remove(uniqueId); + timerTask.run(); + }, delayMS, TimeUnit.MILLISECONDS); + if (delayMS > MIN_INTERVAL_MS) { + CARGO.put(uniqueId, timerFuture); + } + } + } diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/HashedWheelTimerTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/HashedWheelTimerTest.java new file mode 100644 index 00000000..86dd13ca --- /dev/null +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/HashedWheelTimerTest.java @@ -0,0 +1,126 @@ +package com.github.kfcfans.powerjob.server.test; + +import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTimer; +import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture; +import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerTask; +import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 时间轮测试 + * + * @author tjq + * @since 2020/11/28 + */ +@Slf4j +public class HashedWheelTimerTest { + + @Test + public void testHashedWheelTimer() throws Exception { + + HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32); + List futures = Lists.newLinkedList(); + + for (int i = 0; i < 1000; i++) { + + String name = "Task" + i; + long nowMS = System.currentTimeMillis(); + int delayMS = ThreadLocalRandom.current().nextInt(60000); + long targetTime = delayMS + nowMS; + + TimerTask timerTask = () -> { + System.out.println("============= " + name + "============= "); + System.out.println("ThreadInfo:" + Thread.currentThread().getName()); + System.out.println("expectTime:" + targetTime);; + System.out.println("currentTime:" + System.currentTimeMillis()); + System.out.println("deviation:" + (System.currentTimeMillis() - targetTime)); + System.out.println("============= " + name + "============= "); + }; + futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS)); + } + + // 随机取消 + futures.forEach(future -> { + + int x = ThreadLocalRandom.current().nextInt(2); + if (x == 1) { + future.cancel(); + } + + }); + + Thread.sleep(1000); + + // 关闭 + System.out.println(timer.stop().size()); + System.out.println("Finished!"); + + Thread.sleep(277777777); + } + + @Test + public void testPerformance() throws Exception { + Stopwatch sw = Stopwatch.createStarted(); + for (long i = 0; i < 10000000; i++) { + long delay = ThreadLocalRandom.current().nextLong(100, 120000); + long expect = System.currentTimeMillis() + delay; + InstanceTimeWheelService.schedule(i, delay, () -> { + log.info("[Performance] deviation:{}", (System.currentTimeMillis() - expect)); + }); + } + log.info("[Performance] insert cost: {}", sw); + + Thread.sleep(90000); + } + + @Test + public void testLongDelayTask() throws Exception { + for (long i = 0; i < 1000000; i++) { + long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 3); + long expect = System.currentTimeMillis() + delay; + InstanceTimeWheelService.schedule(i, delay, () -> { + log.info("[LongDelayTask] deviation: {}", (System.currentTimeMillis() - expect)); + }); + } + + Thread.sleep(60000 * 4); + } + + @Test + public void testCancelDelayTask() throws Exception { + + AtomicLong executeNum = new AtomicLong(); + AtomicLong cancelNum = new AtomicLong(); + for (long i = 0; i < 1000000; i++) { + long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 2); + long expect = System.currentTimeMillis() + delay; + InstanceTimeWheelService.schedule(i, delay, () -> { + executeNum.incrementAndGet(); + log.info("[CancelLongDelayTask] deviation: {}", (System.currentTimeMillis() - expect)); + }); + } + + Thread.sleep(10000); + + for (long i = 0; i < 1000000; i++) { + boolean nextBoolean = ThreadLocalRandom.current().nextBoolean(); + if (nextBoolean) { + continue; + } + boolean cancel = InstanceTimeWheelService.fetchTimerFuture(i).cancel(); + log.info("[CancelLongDelayTask] id:{},status:{}", i, cancel); + cancelNum.incrementAndGet(); + } + + Thread.sleep(60000 * 4); + log.info("[CancelLongDelayTask] result -> executeNum:{},cancelNum:{}", executeNum, cancelNum); + } +} diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/UtilsTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/UtilsTest.java index c8b52f20..b3592a4a 100644 --- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/UtilsTest.java +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/UtilsTest.java @@ -24,49 +24,6 @@ import java.util.stream.Collectors; */ public class UtilsTest { - @Test - public void testHashedWheelTimer() throws Exception { - - HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32); - List futures = Lists.newLinkedList(); - - for (int i = 0; i < 1000; i++) { - - String name = "Task" + i; - long nowMS = System.currentTimeMillis(); - int delayMS = ThreadLocalRandom.current().nextInt(60000); - long targetTime = delayMS + nowMS; - - TimerTask timerTask = () -> { - System.out.println("============= " + name + "============= "); - System.out.println("ThreadInfo:" + Thread.currentThread().getName()); - System.out.println("expectTime:" + targetTime);; - System.out.println("currentTime:" + System.currentTimeMillis()); - System.out.println("deviation:" + (System.currentTimeMillis() - targetTime)); - System.out.println("============= " + name + "============= "); - }; - futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS)); - } - - // 随机取消 - futures.forEach(future -> { - - int x = ThreadLocalRandom.current().nextInt(2); - if (x == 1) { - future.cancel(); - } - - }); - - Thread.sleep(1000); - - // 关闭 - System.out.println(timer.stop().size()); - System.out.println("Finished!"); - - Thread.sleep(277777777); - } - @Test public void testCronExpression() throws Exception { String cron = "0 * * * * ? *"; diff --git a/powerjob-server/src/test/resources/logback-test.xml b/powerjob-server/src/test/resources/logback-test.xml new file mode 100644 index 00000000..41e6a2e7 --- /dev/null +++ b/powerjob-server/src/test/resources/logback-test.xml @@ -0,0 +1,19 @@ + + + + + + + + + + ${CONSOLE_LOG_PATTERN} + utf8 + + + + + + + + \ No newline at end of file