> 文章列表 > Netty时间轮源码解析

Netty时间轮源码解析

Netty时间轮源码解析

  Netty主要应用用于网络通信,Netty还有一个非常重要的应用领域,即时通信系统IM, 在IM聊天系统中,有成千上万条条链路, Netty是如何管理这些链路的呢 ? Netty还有一套自带的心跳检测机制,这套检测机制的原理是通过创建多个定时任务ScheduleFutureTask,定时一段时间与客户端进行通信 ,确保连接可用。

Netty时间轮的解读

数据结构

  时间轮其实就是一种环形的数据结构,其设计参考了时钟转动的思维,可以想象成时钟,分成很多格子,一个格子代表一段时间(时间越短,精度越高,过段没有意义,根据具体使用场景裁定)。并用一个链表表示该格子上的到期任务,一个指针随着时间一格一格转动,并执行相应格子中的到期任务。任务通过取摸决定放入那个格子。如下图所示:

Netty时间轮源码解析
  都知道时钟有指针,刻度,每刻度表示的时长等属性, Netty的时间轮设计也差不多, 只是时钟的指针有时,分,秒,而Netty只用了一个指针,那么Netty是如何把定时任务加到时间轮的呢? 下面先看一幅时间轮的构造图。
Netty时间轮源码解析
  中间的圆轮代表一个时间周期,轮子上的每个节点关联的链表代表该时间点要触发的任务。如上图所示,假设一个格子是1秒,则整个wheel能表示的时间段为8s,假如当前指针指向2,此时需要调度一个3s后执行的任务,显然应该加入到(2+3=5)的方格中,指针再走3次就可以执行了;如果任务要在10s后执行,应该等指针走完一个round零2格再执行,因此应放入4,同时将round(1)保存到任务中。检查到期任务时应当只执行round为0的,格子上其他任务的round应减1。

  从图中可以看出,当指针指向某一个刻度时, 综会把此刻度中的所有的task任务一一取出并运行,在解读Netty的时间轮代码前 。

  • 时间轮的指针走一轮是多久?
  • 时间轮采用什么容器存储这些task的?
  • 定时任务的运行时间若晚于指针走一轮的终点,则此时任务放在哪个刻度。
  1. 刻度的时间间隔标注为tickDuration, 同时将时间轮的一轮的刻度总数标注为wheelLen,两者都是时间轮属性,可以通过构造方法由使用者传入,这样就可以得到时间轮指针走一轮的时长 = tickDuration * wheelLen。
  2. 当指针运行到某一刻度时,需要把映射在此刻度上所有的任务都取出来 ,而刻度总数在时间轮初始化后就固定了。 因此与Map相似 , 采用数组标识wheel[] 加链表的方式来存储这些task,数组的大小固定为图中的N , 刻度编号就是wheel[]的下标 。
  3. 每个时间轮启动都会记录其启动时间,同时每个定时任务都有其确定的执行时间,用这个执行时间减去时间轮的启动时间,再除以刻度的持续时长,就能获取这个定时任务需要指针走过多少刻度才运行,标注为calculated。

  时间轮本身记录了当前指针已经走过多少刻度,标注为tick,通过caclulated,tick ,时间轮刻度总数wheelLen计算定时任务在哪一刻度上执行(此刻度标注为stopIndex) ,需要分以下几种情况进行处理。

  • 当calculated < tick时, 说明这项任务已经是旧的任务了,可以立即执行。因此stopIndex = tick 。
  • 当(calculated - tick ) <= wheelLen时, stopIndex = (calculated - tick )。
  • 当(calculated - tick) > wheelLen 时, calculated肯定大于wheelLen,若wheelLen是2的整数次幂, 则可以运用与运算stopIndex = calculated & (wheelLen - 1), 若wheelLen 不是2的整数次幂,则把它转换成距离最近的2个整数次幂即可。
时间轮源码剖析之初始化构建

  经过以上的3个问题进行分析,对时间轮的构造有了基本的认知,了解了时间轮内部属性特性,以及定时任务与刻度的映射关系,但具体时间轮是如何运行的,它的指针是如何跳动的,这都需要通过仔细阅读Netty的时间轮源码来寻找答案 , 时间轮源码分为两部分,第一部分包含时间轮的核心属性,初始化构建,启动和定时检测任务的添加,第二部分主要是对时间轮的时钟Worker线程的剖析,线程的核心功能有时钟指针的刻度跳动,超时任务处理,任务的取消等。
Netty时间轮源码解析
  

public interface Timer {Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);Set<Timeout> stop();
}

  HashedWheelTimer本质上是一个Timer,用于将任务定时执行,newTimeout用于添加任务,stop用于终止Timer执行.
  在分析源码之前我们先看一下netty时间轮实现中的核心组件,以便于分析过程中有比较清晰的脉络关系:
Netty时间轮源码解析

public class HashedWheelTimerTest {public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(1);HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 16);final long start = System.currentTimeMillis();timer.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("task execute,current timestamp=" + (System.currentTimeMillis() - start));countDownLatch.countDown();}}, 2000, TimeUnit.MILLISECONDS);countDownLatch.await();System.out.println("============================" + (System.currentTimeMillis() - start));timer.stop();}
}

看结果输出 :
Netty时间轮源码解析
  这个例子的测试目的很简单,就是创建一个TimerTask任务,只有当这个任务执行完,才停止 timer,从而打印停止时间 。 进入HashedWheelTimer的构造方法 。

static final InternalLogger logger =InternalLoggerFactory.getInstance(HashedWheelTimer.class);private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();   //时间轮的实例个数
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean(); // 在服务过程中,时间轮实例个数不能超过64个
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);     // 刻度持续时最小值,不能小于这个最小值
private static final ResourceLeakDetector<HashedWheelTimer> leakDetector =ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);  // 内存泄漏检测private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =// 原子性更新时间轮工作状态,防止多线程重复操作AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");private final ResourceLeakTracker<HashedWheelTimer> leak;                   // 内存泄漏检测虚引用
private final Worker worker = new Worker();                         // 用于构建时间轮工作线程的Runnable掌控指针的跳动
private final Thread workerThread;                                      // 时间轮工作线程
// 时间轮的3种工作状态分别为初始化,已经启动正在运行,停止
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int workerState; // 0 - init, 1 - started, 2 - shut downprivate final long tickDuration;                    // 每刻度的持续时间
// 此数组用于存储映射在时间轮刻度上的
private final HashedWheelBucket[] wheel;private final int mask;                                         // 时间轮总格子数 -1
private final CountDownLatch startTimeInitialized = new CountDownLatch(1); // 同步计数器,时间轮Workder 线程启动后,将同步给调用时间轮的线程// 超时task任务队列,先将任务放入到这个队列中, 再在Worker 线程中队列中取出并放入wheel[]的链表中
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();// 取消的task任务存放队列,在Worker线程中会检测是否有任务需要取消 , 若有,则找到对应的链表,并修改这些取消任务的前后任务的指针
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;              // 时间轮最多容纳多少定时检测任务,默认为-1,无限制private volatile long startTime;                            // 时间轮启动时间public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel) {this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
}/***  时间轮构造函数* @param threadFactory  线程工厂,用于创建线程* @param tickDuration              刻度持续时长* @param unit                  刻度持续时长单位* @param ticksPerWheel             时间轮总刻度数* @param leakDetection         是否开启内存泄漏检测* @param maxPendingTimeouts            时间轮可接受最大定时检测任务数*/
public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}if (unit == null) {throw new NullPointerException("unit");}if (tickDuration <= 0) {throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);}if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}// Normalize ticksPerWheel to power of two and initialize the wheel.   对时间轮刻度数进行格式化,转换成高ticksPerWheel最近的2的整数次幂,并初始化wheel 数组wheel = createWheel(ticksPerWheel);mask = wheel.length - 1;// Convert tickDuration to nanos.  把刻度持续时长转换成纳秒,这样更加精确long duration = unit.toNanos(tickDuration);// Prevent overflow.                检测持续时长不能太长,但也不能太短if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}//  MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1)  刻度持续时最小值,不能小于这个最小值 if (duration < MILLISECOND_NANOS) {if (logger.isWarnEnabled()) {logger.warn("Configured tickDuration %d smaller then %d, using 1ms.",tickDuration, MILLISECOND_NANOS);}this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}workerThread = threadFactory.newThread(worker);                     // 构建时间轮的Worker线程leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;         // 是否需要内存泄漏检测this.maxPendingTimeouts = maxPendingTimeouts;                  // 最大定时检测任务个数if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&                // INSTANCE_COUNT_LIMIT 默认为64 , 时间轮实例个数检测,超过64个会告警WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}
}private static void reportTooManyInstances() {if (logger.isErrorEnabled()) {String resourceType = simpleClassName(HashedWheelTimer.class);logger.error("You are creating too many " + resourceType + " instances. " +resourceType + " is a shared resource that must be reused across the JVM," +"so that only a few instances are created.");}
}

  上面都是一些初始化时间轮的代码,很简单,接下来看创建时间轮的方法 。

/*** 格式化总刻度数,初始化时间轮容器* @param ticksPerWheel* @return*/
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}if (ticksPerWheel > 1073741824) {throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);}ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);          // 格式化HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}return wheel;
}
// 找到离ticksPerWheel最近的2个整数次幂
private static int normalizeTicksPerWheel(int ticksPerWheel) {int normalizedTicksPerWheel = 1;while (normalizedTicksPerWheel < ticksPerWheel) {normalizedTicksPerWheel <<= 1;}return normalizedTicksPerWheel;
}

  从时间轮的创建方法得知,我们创建了一个HashedWheelBucket数组,而HashedWheelBucket数组的长度为2的幂次方倍,而HashedWheelBucket就是任务链表。

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); // 需等待执行任务数+ 1 , 同时判断是否超过最大限制if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}start();                            // 若时间轮Worker线程未启动,则需要启动// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.// 根据定时任务延时执行时间与时间轮启动时间,获取相对的时间轮开始后的任务执行延时时间,因为时间轮开始启动时间不是会改变的, 所以通过这个时间可以获取时钟需要跳动的刻度long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}// 构建定时检测任务,并将其添加到新增定时检测任务队列中, 在Worker线程中,会从队列中取出定时检测任务并发放入缓存数组wheel中HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;
}

  重点看时间轮启动方法 。

public void start() {                               // 时间轮启动switch (WORKER_STATE_UPDATER.get(this)) {                  // 根据时间轮状态进行对应的处理case WORKER_STATE_INIT:                            // 当时间轮处于初始化状态时,需要启动它if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {        // 原子性启动workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// Wait until the startTime is initialized by the worker.while (startTime == 0) {                            // 等待Worker 线程初始化成功try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}
}

  启动方法中重点看workerThread.start(), 如果多个线程同时调用start()方法,只有一个线程CAS抢锁成功,并调用workerThread.start( )代码,而其他的线程都在等待startTimeInitialized.await(),那其他等待的线程什么时候从等待中唤醒呢?请看Worker任务的执行方法 。
Netty时间轮源码解析
  大家可能对CountDownLatch使用得比较少,我们可以看一个例子来理解CountDownLatch在这里的原理 。

public class HashedWheelTimerTest2 {public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(1);final long start = System.currentTimeMillis();for(int i = 0 ;i <  3 ;i ++){new Thread(new Runnable() {@Overridepublic void run() {try {countDownLatch.await();System.out.println("===========" + (System.currentTimeMillis() - start));} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println("----------start------------------");Thread.sleep(3000);countDownLatch.countDown();System.out.println("--------------end -----------------");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}
}

执行结果
Netty时间轮源码解析  只有当countDown()方法执行完之后,所有的wait()方法才会继续向下执行,接下来进入Worker的run()方法 。

private final class Worker implements Runnable {private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();  // 当调用了时间轮的stop()方法后,将获取其未执行完的任务private long tick;                              // 时钟指针的跳动次数@Overridepublic void run() {// Initialize the startTime.startTime = System.nanoTime();                      // 时间轮启动的时间if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().      Worker 线程初始化了,通知调用时间轮启动的线程startTimeInitialized.countDown();do {// 获取下一刻度时间轮总体的执行时间,记录这个时间与时间轮启动时间和大于当前时间时, 线程会睡眠到这个时间点final long deadline = waitForNextTick();if (deadline > 0) {int idx = (int) (tick & mask);      // 获取刻度的编号,即wheel 数组的下标processCancelledTasks();                // 先处理需要取消的任务HashedWheelBucket bucket = wheel[idx];        // 获取刻度所在的缓存链表transferTimeoutsToBuckets();                // 把新增加的定时任务加入wheel数组的缓存链表中bucket.expireTimeouts(deadline);                        // 循环执行刻度所在的缓存链表tick++;             // 执行完后,指针才正式跳动}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);      // 时间轮状态需要为已经启动状态// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket: wheel) {                // 运行到这里说明时间轮停止了,需要把未处理的任务返回bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {                          // 刚刚加入还未来得及放入时间轮缓存中的超时任务 ,也需要捞出并放入到unprocessedTimeouts中一起返回HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}processCancelledTasks();              // 处理需要取消的任务}
}

  Worker线程是整个时间轮的核心,它拥有一个属性——tick。 tick与时间刻度有一定的关联,指针每经过一个刻度后,tick++; tick与mask
(时间轮总格子数-1)进行与操作后,就是时间轮指针的 当前刻度序号。在Worker线程中,tick做了以下4件事。

  1. 等待下一刻度运行时间到来。
  2. 从取消任务队列中获取需要取消的任务并处理。
  3. 从任务队列中获取需要执行的定时检测任务,并把它们放入对 应的刻度链表中。
  4. 从当前刻度链表中取出需要执行的定时检测任务,并循环执行 这些定时检测任务的run()方法。

上边操作用图描述如下:

Netty时间轮源码解析

  接下来看到下一个刻度时需要等待多少毫秒。

private long waitForNextTick() {long deadline = tickDuration * (tick + 1);                      // 获取下一刻度时间轮总体的执行时间for (;;) {final long currentTime = System.nanoTime() - startTime;         // 当前时间 - 启动时间// 计算需要睡眠的毫秒时间 , 由于在将纳秒转化毫秒时需要除以1000000,因此需要加上999999,以防赴丢失尾数,任务被提前执行long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;if (sleepTimeMs <= 0) {                     // 当睡眠时间小于,且 等于Long.MiN_VALUE时,直跳过此刻度,否则不睡眠,直接执行任务if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// Check if we run on windows, as if thats the case we will need// to round the sleepTime as workaround for a bug that only affect// the JVM if it runs on windows.//// See https://github.com/netty/netty/issues/356  Window 操作系统特殊处理, 其Sleep函数是以10ms 为单位进行延时的,// 也就是说,所有小于10且大于0的情况都是10ms, 所有大于 10且小于20的情况都是20ms , 因此这里做了特殊的处理, 对于小于10ms 的,直接不睡眠。 对于 大于 10ms的,去掉层尾数if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {// 当发生异常,发现时间轮状态为WORKER_STATE_SHUTDOWN时,立刻返回if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}
}

  再来看取消的任务处理。

private void processCancelledTasks() {for (;;) {HashedWheelTimeout timeout = cancelledTimeouts.poll();if (timeout == null) {// all processedbreak;}try {timeout.remove();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while process a cancellation task", t);}}}
}void remove() {HashedWheelBucket bucket = this.bucket;if (bucket != null) {bucket.remove(this);} else {timer.pendingTimeouts.decrementAndGet();}
}public HashedWheelTimeout remove(HashedWheelTimeout timeout) {HashedWheelTimeout next = timeout.next;// remove timeout that was either processed or cancelled by updating the linked-listif (timeout.prev != null) {timeout.prev.next = next;}if (timeout.next != null) {timeout.next.prev = timeout.prev;}if (timeout == head) {// if timeout is also the tail we need to adjust the entry tooif (timeout == tail) {tail = null;head = null;} else {head = next;}} else if (timeout == tail) {// if the timeout is the tail modify the tail to be the prev node.tail = timeout.prev;}// null out prev, next and bucket to allow for GC.timeout.prev = null;timeout.next = null;timeout.bucket = null;timeout.timer.pendingTimeouts.decrementAndGet();return next;
}

  processCancelledTasks()方法的实现原理也很简单,从cancelledTimeouts队列中取出HashedWheelTimeout,并从bucket链表中移除即可。 那怎样才能将HashedWheelTimeout加入到cancelledTimeouts上呢? 从HashedWheelTimeout的cancel()方法中可以看到 。

public boolean cancel() {// only update the state it will be removed from HashedWheelBucket on next tick.if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {return false;}// If a task should be canceled we put this to another queue which will be processed on each tick.// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.timer.cancelledTimeouts.add(this);return true;
}

  因此,可以推出时间轮取消方法的使用。
Netty时间轮源码解析
  当处理完所有的取消任务后,此时会调用transferTimeoutsToBuckets()将所有未加入到链表的任务不回到bucket链表中,接下来看transferTimeoutsToBuckets()的源码实现。

private void transferTimeoutsToBuckets() {// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just// adds new timeouts in a loop.for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}// 每个时间轮启动都会记录其启动时间,同时,每个定时任务都有其确定的执行开始时间,用这个执行开始时间减去时间轮的启动时间,// 再除以刻度的持续时长,就能获取这个定时任务需要指针走过多少刻度才运行,标注为calculated。long calculated = timeout.deadline / tickDuration;timeout.remainingRounds = (calculated - tick) / wheel.length;final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}
}

  这个方法其实还是简单的,通过当前HashedWheelTimeout对象的deadline 执行时间计算出HashedWheelTimeout应该在时间轮的哪个刻度执行,而timeout.remainingRounds 表示时间轮指针运行几圈后执行,如果timeout.remainingRounds为0,则表示指针在当前圈就执行。
Netty时间轮源码解析
  如图中的1节点的任务,需要指针运行1圈后才执行,如果当前指针指向的tick为0,那么指针走过的tick为0,1,2,3,4,5,6,7,0,1,2,3。 大家应该明白remainingRounds的含义了吧。 而指针每走一圈,remainingRounds的值就减少1,当remainingRounds=0时,即任务可以执行,接下来看HashedWheelTimeout的执行方法 。

public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;// process all timeoutswhile (timeout != null) {HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {next = remove(timeout);if (timeout.deadline <= deadline) {timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {// remainingRounds减少1timeout.remainingRounds --;}timeout = next;}
}public void expire() {if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {task.run(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);}}
}

  接下来看时间轮的停止方法 。

public Set<Timeout> stop() {if (Thread.currentThread() == workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() +".stop() cannot be called from " +TimerTask.class.getSimpleName());}if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2.if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}return Collections.emptySet();}try {boolean interrupted = false;while (workerThread.isAlive()) {workerThread.interrupt();try {workerThread.join(100);} catch (InterruptedException ignored) {interrupted = true;}}if (interrupted) {Thread.currentThread().interrupt();}} finally {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}return worker.unprocessedTimeouts();
}

  对于时间轮的停止方法,我们需要注意上面加粗代码,因为当前HashedWheelTimer的workerState的值可能是0,也可能是1,当然,是1的可能性更大,因此先使用WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)来修改时间轮的状态,如果修改失败,那HashedWheelTimer的workerState的值可能的值可能是1,因此再调用WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN)修改workerState的状态为WORKER_STATE_SHUTDOWN。 从细节处可以看到Netty能性能的把控。 如果CAS操作将workerState的值修改为WORKER_STATE_SHUTDOWN,此时需要修改总时间轮的个数INSTANCE_COUNTER 减1,接着调用worker.unprocessedTimeouts()方法返回所有未处理的Timeout,大家有没有好奇,workerThread.join(100)这一行代码的含义。
Netty时间轮源码解析
  为什么要等0.1秒呢? 大家发现没有,在等待workerThread线程0.1秒后,返回所有的unprocessedTimeouts,而unprocessedTimeouts从何而来呢? 请看Worker的run()方法 。
Netty时间轮源码解析

public void clearTimeouts(Set<Timeout> set) {for (;;) {HashedWheelTimeout timeout = pollTimeout();if (timeout == null) {return;}if (timeout.isExpired() || timeout.isCancelled()) {continue;}set.add(timeout);}
}

  因此现在知道为什么等待工作线程0.1秒了吧,等待工作线程将已经添加到链表中并没有被取消的HashedWheelTimeout,以及未被添加到链表中的HashedWheelTimeout 保存到unprocessedTimeouts中。

多重时间轮

  当时间跨度很大时,提升单层时间轮的tickDuration可以减少空转次数,但会导致时间精度变低,层级时间轮既可以避免精度降低,又可以避免指针空转次数,如果有时间跨度较长的定时任务,则可交给层级时间轮去调度
Netty时间轮源码解析

源码总结

  HashedWheelTimer时间轮是一个高性能,低消耗的数据结构,它适合用非准实时,延迟的短平快任务,比如心跳检测和会话探活,对于可靠性要求比较严格的延迟任务,时间轮目前并不是比较好的解决方案:

  原生时间轮是单机的,在分布式和多实例部署的场景中乏力
宕机重新恢复执行,原生时间轮的存储是Mpsc队列,毫无疑问是内存存储,如果出现宕机或者重启,数据是不可恢复的
对于类似订单超时取消的场景,可以考虑时间轮+zk + db的方式实现,zk做中心化控制,避免超时任务在多节点重复执行,也即是数据去重,db做为延时任务的持久化存储,宕机可恢复;具体方案可行性有待考量,感兴趣可以自己推演。

参考文章
Netty时间轮

  在实际的开发中,可能有有发行时间轮的需求,因此这里将时间轮代码给摘取出来,以供将来使用。
Netty时间轮源码解析
代码地址
https://gitee.com/quyixiao/hashed-wheel-timer.git

  到这里时间轮代码也分析得差不多了, Netty的代码也分析得差不多了,历时半年时间,Netty源码的解析也告一段落,今后的一段时间可能去研究ZooKeeper源码,下一篇博客见。

Netty测试代码

https://gitee.com/quyixiao/netty-netty-4.1.38.Final.git