From f19da1dcf2abd19761b88d347da2369d7a2960de Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 3 Apr 2020 14:49:56 +0800 Subject: [PATCH] finished the basic HashedWheelTimer --- .../utils/timewheel/HashedWheelTimer.java | 258 ++++++++++++++++++ .../server/common/utils/timewheel/Timer.java | 22 ++ .../common/utils/timewheel/TimerFuture.java | 48 ++++ .../common/utils/timewheel/TimerTask.java | 21 ++ .../kfcfans/oms/server/test/UtilsTest.java | 65 +++++ 5 files changed, 414 insertions(+) create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/HashedWheelTimer.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/Timer.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/TimerFuture.java create mode 100644 oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/TimerTask.java create mode 100644 oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/HashedWheelTimer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/HashedWheelTimer.java new file mode 100644 index 00000000..972c7609 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/HashedWheelTimer.java @@ -0,0 +1,258 @@ +package com.github.kfcfans.oms.server.common.utils.timewheel; + +import com.google.common.collect.Queues; +import lombok.extern.slf4j.Slf4j; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +/** + * 时间轮定时器 + * + * @author tjq + * @since 2020/4/2 + */ +@Slf4j +public class HashedWheelTimer implements Timer { + + private final long tickDuration; + private final HashedWheelBucket[] wheel; + private final int mask; + + private final Thread indicatorThread; + + private long startTime; + + private final Queue waitingTasks = Queues.newLinkedBlockingQueue(); + private final Queue canceledTasks = Queues.newLinkedBlockingQueue(); + + private static final int MAXIMUM_CAPACITY = 1 << 30; + + /** + * 新建时间轮定时器 + * @param tickDuration 时间间隔,单位毫秒(ms) + * @param ticksPerWheel 轮盘个数 + */ + public HashedWheelTimer(long tickDuration, int ticksPerWheel) { + + this.tickDuration = tickDuration; + + // 初始化轮盘,大小格式化为2的N次,可以使用 & 代替取余 + int ticksNum = formatSize(ticksPerWheel); + wheel = new HashedWheelBucket[ticksNum]; + for (int i = 0; i < ticksNum; i++) { + wheel[i] = new HashedWheelBucket(); + } + mask = wheel.length - 1; + + startTime = System.currentTimeMillis(); + + // 启动后台线程 + indicatorThread = new Thread(new IndicatorRunnable(), "HashedWheelTimer-Indicator"); + indicatorThread.start(); + } + + @Override + public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) { + + long targetTime = System.currentTimeMillis() + unit.toMillis(delay); + HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime); + + // 写入阻塞队列,保证并发安全(性能进一步优化可以考虑 Netty 的 Multi-Producer-Single-Consumer队列) + waitingTasks.add(timerFuture); + + return timerFuture; + } + + @Override + public void stop() { + + } + + private final class HashedWheelTimerFuture implements TimerFuture { + + // 预期执行时间 + private final long targetTime; + private final TimerTask timerTask; + + // 所属的时间格,用于快速删除该任务 + private HashedWheelBucket bucket; + // 剩余圈数 + private long totalTicks; + // 当前状态 0 - 初始化等待中,1 - 运行中,2 - 完成,3 - 已取消 + private int status; + + // 状态枚举值 + private static final int WAITING = 0; + private static final int RUNNING = 1; + private static final int FINISHED = 2; + private static final int CANCELED = 3; + + public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) { + + this.targetTime = targetTime; + this.timerTask = timerTask; + this.status = WAITING; + } + + @Override + public TimerTask getTask() { + return timerTask; + } + + @Override + public boolean cancel() { + if (status == WAITING) { + status = CANCELED; + canceledTasks.add(this); + return true; + } + return false; + } + + @Override + public boolean isCancelled() { + return status == CANCELED; + } + + @Override + public boolean isDone() { + return startTime == FINISHED; + } + } + + private static final class HashedWheelBucket extends LinkedList { + + public void expireTimerTasks(long currentTick) { + + removeIf(timerFuture -> { + + if (timerFuture.status != HashedWheelTimerFuture.WAITING) { + log.warn("[HashedWheelTimer] impossible, please fix the bug"); + return true; + } + + // 本轮直接调度 + if (timerFuture.totalTicks <= currentTick) { + + if (timerFuture.totalTicks < currentTick) { + log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug"); + } + + try { + timerFuture.timerTask.onScheduled(); + }catch (Exception ignore) { + + } finally { + timerFuture.status = HashedWheelTimerFuture.FINISHED; + } + return true; + } + + return false; + }); + + } + + } + + /** + * 模拟时钟转动 + */ + private class IndicatorRunnable implements Runnable { + + private long tick = 0; + + @Override + public void run() { + while (true) { + + // 1. 将任务从队列推入时间轮 + pushTaskToBucket(); + // 2. 处理取消的任务 + processCanceledTasks(); + // 3. 等待指针跳向下一刻 + tickTack(); + // 4. 执行定时任务 + int currentIndex = (int) (tick & mask); + HashedWheelBucket bucket = wheel[currentIndex]; + bucket.expireTimerTasks(tick); + + tick ++; + } + } + + /** + * 模拟指针转动,当返回时指针已经转到了下一个刻度 + */ + private void tickTack() { + + // 下一次调度的绝对时间 + long nextTime = startTime + (tick + 1) * tickDuration; + long sleepTime = nextTime - System.currentTimeMillis(); + + if (sleepTime > 0) { + try { + Thread.sleep(sleepTime); + }catch (Exception ignore) { + } + } + } + + /** + * 处理被取消的任务 + */ + private void processCanceledTasks() { + while (true) { + HashedWheelTimerFuture canceledTask = canceledTasks.poll(); + if (canceledTask == null) { + return; + } + // 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理) + if (canceledTask.bucket != null) { + canceledTask.bucket.remove(canceledTask); + } + // 调用回调方法 + try { + canceledTask.timerTask.onCanceled(); + }catch (Exception ignore) { + } + } + } + + /** + * 将队列中的任务推入时间轮中 + */ + private void pushTaskToBucket() { + + while (true) { + HashedWheelTimerFuture timerTask = waitingTasks.poll(); + if (timerTask == null) { + return; + } + + // 总共的偏移量 + long offset = timerTask.targetTime - startTime; + // 总共需要走的指针步数 + timerTask.totalTicks = offset / tickDuration; + // 取余计算 bucket index + int index = (int) (timerTask.totalTicks & mask); + + if (timerTask.status == HashedWheelTimerFuture.WAITING) { + wheel[index].add(timerTask); + } + } + } + } + + private static int formatSize(int cap) { + int n = cap - 1; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; + } +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/Timer.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/Timer.java new file mode 100644 index 00000000..c1c9594f --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/Timer.java @@ -0,0 +1,22 @@ +package com.github.kfcfans.oms.server.common.utils.timewheel; + +import java.util.concurrent.TimeUnit; + +/** + * 定时器 + * + * @author tjq + * @since 2020/4/2 + */ +public interface Timer { + + /** + * 调度定时任务 + */ + TimerFuture schedule(TimerTask task, long delay, TimeUnit unit); + + /** + * 停止所有调度任务 + */ + void stop(); +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/TimerFuture.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/TimerFuture.java new file mode 100644 index 00000000..8d5498d3 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/TimerFuture.java @@ -0,0 +1,48 @@ +package com.github.kfcfans.oms.server.common.utils.timewheel; + +/** + * description + * + * @author tjq + * @since 2020/4/3 + */ +public interface TimerFuture { + + TimerTask getTask(); + + /** + * Attempts to cancel execution of this task. This attempt will + * fail if the task has already completed, has already been cancelled, + * or could not be cancelled for some other reason. If successful, + * and this task has not started when {@code cancel} is called, + * this task should never run. If the task has already started, + * then the {@code mayInterruptIfRunning} parameter determines + * whether the thread executing this task should be interrupted in + * an attempt to stop the task. + * + *

After this method returns, subsequent calls to {@link #isDone} will + * always return {@code true}. Subsequent calls to {@link #isCancelled} + * will always return {@code true} if this method returned {@code true}. + * + */ + boolean cancel(); + + /** + * Returns {@code true} if this task was cancelled before it completed + * normally. + * + * @return {@code true} if this task was cancelled before it completed + */ + boolean isCancelled(); + + /** + * Returns {@code true} if this task completed. + * + * Completion may be due to normal termination, an exception, or + * cancellation -- in all of these cases, this method will return + * {@code true}. + * + * @return {@code true} if this task completed + */ + boolean isDone(); +} diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/TimerTask.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/TimerTask.java new file mode 100644 index 00000000..e07ae0a8 --- /dev/null +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/common/utils/timewheel/TimerTask.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.oms.server.common.utils.timewheel; + +/** + * 时间任务接口 + * + * @author tjq + * @since 2020/4/2 + */ +public interface TimerTask { + + /** + * 正常执行时调用 + */ + void onScheduled(); + + /** + * 被取消时调用 + */ + default void onCanceled() { + } +} diff --git a/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java new file mode 100644 index 00000000..8e5a9539 --- /dev/null +++ b/oh-my-scheduler-server/src/test/java/com/github/kfcfans/oms/server/test/UtilsTest.java @@ -0,0 +1,65 @@ +package com.github.kfcfans.oms.server.test; + +import com.github.kfcfans.oms.server.common.utils.timewheel.HashedWheelTimer; +import com.github.kfcfans.oms.server.common.utils.timewheel.TimerFuture; +import com.github.kfcfans.oms.server.common.utils.timewheel.TimerTask; +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +/** + * 工具类测试 + * + * @author tjq + * @since 2020/4/3 + */ +public class UtilsTest { + + @Test + public void testHashedWheelTimer() throws Exception { + + HashedWheelTimer timer = new HashedWheelTimer(1, 1024); + List futures = Lists.newLinkedList(); + + for (int i = 0; i < 1000; i++) { + + String name = "Task" + i; + long nowMS = System.currentTimeMillis(); + int delayMS = ThreadLocalRandom.current().nextInt(60000); + long targetTime = delayMS + nowMS; + + TimerTask timerTask = new TimerTask() { + @Override + public void onScheduled() { + System.out.println("============= " + name + "============= "); + System.out.println("expectTime:" + targetTime);; + System.out.println("currentTime:" + System.currentTimeMillis()); + System.out.println("deviation:" + (System.currentTimeMillis() - targetTime)); + System.out.println("============= " + name + "============= "); + } + + @Override + public void onCanceled() { + System.out.println(name + "be canceled!"); + } + }; + futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS)); + } + + // 随机取消 + futures.forEach(future -> { + + int x = ThreadLocalRandom.current().nextInt(2); + if (x == 1) { + future.cancel(); + } + + }); + + Thread.sleep(277777777); + } + +}