> 文章列表 > Kafka中时间轮分析与Java实现

Kafka中时间轮分析与Java实现

Kafka中时间轮分析与Java实现

仿kafka实现java版时间轮_java实现时间轮算法_Hekliu的博客-CSDN博客

https://www.cnblogs.com/softlin/p/7426083.html

https://blog.csdn.net/happyjacob/article/details/128518700 

一、背景

在Kafka中应用了大量的延迟操作但在Kafka中 并没用使用JDK自带的Timer或是DelayQueue用于延迟操作,而是使用自己开发的DelayedOperationPurgatory组件用于管理延迟操作,Kafka这类分布式框架有大量延迟操作并且对性能要求及其高,而java.util.Timer与java.util.concurrent.DelayQueue的插入和删除时间复杂度都为对数阶O(log n)并不能满足Kafka性能要求,所以Kafka实现了基于时间轮的定时任务组件,该时间轮定时任务实现的插入与删除(开始定时器与暂停定时器)的时间复杂度都为常数阶O(1)
  时间轮的应用并不少见,在Netty、akka、Quartz、Zookeeper等高性能组件中都存在时间轮定时器的踪影;

二、时间轮数据结构

2.1 时间轮名词解释:

     时间格:环形结构中用于存放延迟任务的区块;
  指针(CurrentTime):指向当前操作的时间格,代表当前时间
  格数(ticksPerWheel):为时间轮中时间格的个数
  间隔(tickDuration):每个时间格之间的间隔
  总间隔(interval):当前时间轮总间隔,也就是等于ticksPerWheel*tickDuration

  TimingWheel并非简单的环形时间轮,而是多层级时间轮,每个时间轮由多个时间格组成,每个时间格为一个时间间隔,底层的时间格跨度较小,然后随着延迟任务延迟时间的长短逐层变大;如上图,底下的时间轮每个时间格为1ms,整个时间轮为10ms,而上面一层的时间轮中时间格为10ms,整个时间轮为100ms;

  时间轮添加上级时间轮的规则为:当前currentTime为上级时间轮的startMs,当前interval为上级时间轮的tickDuration,每层ticksPerWheel相同;简单点说就是上层时间轮跨度为当前的M倍,时间格为当前的N倍;

在kafka中第一个槽默认一格表示1ms,第一个时间轮是20个槽,所以

  • 第一一个时间轮代表20ms。
  • 第二个时间轮的每一格式第一个时间轮的总时间,也就是20ms,所以第二个时间轮可表示的时间范围是400ms,
  • 依次类推,第三个时间轮可表示的时间范围是8s,
  • 第四个时间轮是160s等等。

由于时间在向前推进,故一段时间后,第二个时间轮上的任务会向转移到第一个时间轮上,这样递进的方式,最终任务都会执行。
kafka中的每个槽表示一个TimerTaskList,每个任务加到这个TimerTaskList上,如下图中时间轮中每个槽都代表一个TimerTaskList。 

三、Kafka中时间轮的实现

3.1 名词解释

  Kafka中时间轮时间类为TimingWheel,该类结构为存储定时任务的环形队列,内部使用数组实现,数组是用于存放TimerTaskList对象,TimerTaskList环形双向链表,链表项TimerTaskEntry封装了定时任务TimerTaskTimerTaskListTimerTaskEntry中均有超时时间字段,TimerTaskdelayMs字段用于记录任务延迟时间;该三个类为Kafka时间轮实现的核心;

  • TimingWheel:表示一个时间轮,通常会有多层时间轮也就存在多个TimingWheel对象;
  • TimerTaskList:为数组对象用于存放延迟任务,一个TimerTaskList就代表一个时间格,一个时间格中能保存的任务到期时间只可在[t~t+10ms]区间(t为时间格到期时间,10ms时间格间格),每个时间格有个过期时间,时间格过期后时间格中的任务将向前移动存入前面时间格中;
  • TimerTask:表示延迟任务;
  • SystemTimer:kafka实现的定时器,内部封装了TimningWheel用于执行、管理定时任务;

3.2 工作过程

下面通过一个示例来介绍kafka时间轮的工作过程:

  时间轮初始化:初始时间轮中的格数、间隔、指针的初始化时间,创建时间格所对应的buckets数组,计算总间隔interval;
  添加延迟任务:判断该任务是否已被取消、是否已经过期如已过期则把任务放入线程池中执行、根据时间轮总间隔与当前时间判断任务是否可存入当前层级时间轮否则添加上层时间轮并再次尝试往时间轮中添加该任务;

  时间轮降级:有一个定时任务再300ms后将执行,现层级时间轮每层有10个时间格,顶层时间轮的时间格间隔为1ms,整个时间轮为10ms,无法存下该任务。这时创建第二层时间轮,时间格间隔为10ms,整个时间轮为100ms,还是无法存该任务。接着创建第三层时间轮,时间格间隔为100ms,整个时间轮为1000ms,此时任务存入第三层时间轮的第三个时间格中;过了段时间,TimerTaskList到期(时间格)可该任务还有90ms,还无法执行。此时将再次把定时任务添加到时间轮中,顶层时间轮还是无法满足存入条件,往第二层时间轮添加,这时定时任务存入第二层时间轮第九个时间格当中;任务在时间轮中如此反复,直到任务过期时将放入线程池中执行;

3.3 关键实现方法

 public boolean add(TaskEntry e) {synchronized (this) {long expiration = e.getExpirationMs(); if(expiration<(currentTime+tickDuration)){//当前任务过期时间LOGGER.info("当前任务已过期");return false;}else if(expiration<(currentTime+interval)) {//查找时间格的位置,过期时间/时间格%时间轮大小long virtualId = expiration / tickDuration;TaskEntryList taskEntryList = buckets.get((int) (virtualId % ticksPerWheel));taskEntryList.add(e); //设置EntryList过期时间if(taskEntryList.setTime(virtualId * tickDuration)) { listDelayQueue.offer(taskEntryList);}return true;}else{if(overflowWheel==null){ // 添加上级timingWheeladdOverflowWheel();}return overflowWheel.add(e);}}}  /***时间表针移动* @param timeMS*/public void advanceClock(long timeMS){if(timeMS>=(currentTime+tickDuration)){currentTime=timeMS-(timeMS%tickDuration);}if (overflowWheel != null) overflowWheel.advanceClock(currentTime);
}/*** 添加定时任务* @param taskEntry*/
public void add(TaskEntry taskEntry) {if (!timingWheel.add(taskEntry)) {System.out.println(String.format("任务已过期,开始执行 %s",taskEntry.getTimerTask()));taskExecutor.execute(taskEntry.getTimerTask());}
}

四、java版时间轮的实现

4.1 任务TimerTask源码分析

  • TimingWheel: 时间轮时间类,存储定时任务的环形队列,内部使用数组实现,数组是用于存放TimerTaskList对象,
  • TimerTaskList:环形双向链表
  • TimerTaskEntry:环形双向链表的链表项封装了定时任务TimerTask
  • TimerTask: 表示一个要执行的任务,实现了Runnable接口,TimerTaskdelayMs字段用于记录任务延迟时间

TimerTaskListTimerTaskEntry中均有超时时间字段,该三个类为Kafka时间轮实现的核心。

public abstract class TimerTask implements Runnable {public long delayMs; //表示当前任务延迟多久后执行(单位ms),比如说延迟3s,则此值为3000public TimerTask(long delayMs) {this.delayMs =  delayMs;}// 指向TimerTaskEntry对象,一个TimerTaskEntry包含一个TimerTask,TimerTaskEntry是可复用的private TimerTaskList.TimerTaskEntry timerTaskEntry = null;// 取消当前任务,就是从TimerTaskEntry移出TimerTask,并且把当前的timerTaskEntry置空public synchronized void cancel() {if(timerTaskEntry != null) {timerTaskEntry.remove();}timerTaskEntry = null;}//设置当前任务绑定的TimerTaskEntrypublic synchronized void setTimerTaskEntry(TimerTaskList.TimerTaskEntry entry) {if(timerTaskEntry != null && timerTaskEntry != entry) {timerTaskEntry.remove();}timerTaskEntry = entry;}public TimerTaskList.TimerTaskEntry getTimerTaskEntry() {return timerTaskEntry;}
}

4.2 任务包装类TimerTaskEntry

TimerTaskEntryTimerTask的包装,实现了Compareable接口,用来比较两个任务的过期时间,以决定任务list插入的顺序。

public static class TimerTaskEntry implements Comparable<TimerTaskEntry>{//包含一个任务public TimerTask timerTask;// 任务的过期时间,此处的过期时间设置的过期间隔+系统当前时间(毫秒)public Long expirationMs;// 当前任务属于哪一个列表private TimerTaskList list;// 当前任务的上一个任务,用双向列表连接private TimerTaskEntry prev;private TimerTaskEntry next;public TimerTaskEntry(TimerTask timerTask,Long expirationMs) {this.timerTask = timerTask;this.expirationMs = expirationMs;// 传递进来任务TimerTask,并设置TimerTask的包装类if(timerTask != null) {timerTask.setTimerTaskEntry(this);}}// 任务的取消,就是判断任务TimerTask的Entry是否是当前任务public boolean cancel() {return timerTask.getTimerTaskEntry() != this;}// 任务的移出public void remove() {TimerTaskList currentList = list;while(currentList != null) {currentList.remove(this);currentList = list;}}// 比较两个任务在列表中的位置,及那个先执行@Overridepublic int compareTo(TimerTaskEntry that) {return Long.compare(expirationMs,that.expirationMs);}
}

4.3 每个槽中的任务列表

在时间轮中每个槽代表一个列表,即TimerTaskList,每个TimerTaskList中包含多个TimerTaskEntry,并且用双向列表链接。TimerTaskList实现了Delayed接口,用于返回剩余的时间,把上层时间轮的任务移动位置。

public class TimerTaskList implements Delayed {//当前列表中包含的任务数private AtomicInteger taskCounter;// 列表的头结点private TimerTaskEntry root;// 过期时间private AtomicLong expiration = new AtomicLong(-1L);public TimerTaskList(AtomicInteger taskCounter) {this.taskCounter = taskCounter;this.root =  new TimerTaskEntry(null,-1L);root.next = root;root.prev = root;}// 给当前槽设置过期时间public boolean setExpiration(Long expirationMs) {return expiration.getAndSet(expirationMs) != expirationMs;}public Long getExpiration() {return expiration.get();}// 用于遍历当前列表中的任务public synchronized  void foreach(Consumer<TimerTask> f) {TimerTaskEntry entry = root.next;while(entry != root) {TimerTaskEntry nextEntry = entry.next;if(!entry.cancel()) {f.accept(entry.timerTask);}entry = nextEntry;}}// 添加任务到列表中public void add(TimerTaskEntry timerTaskEntry) {boolean done = false;while(!done) {//  在添加之前尝试移除该定时任务,保证该任务没有在其他链表中timerTaskEntry.remove();synchronized (this) {synchronized (timerTaskEntry) {if(timerTaskEntry.list == null) {TimerTaskEntry tail = root.prev;timerTaskEntry.next = root;timerTaskEntry.prev = tail;timerTaskEntry.list = this;tail.next = timerTaskEntry;root.prev = timerTaskEntry;taskCounter.incrementAndGet();done = true;}}}}}//移出任务private synchronized void remove(TimerTaskEntry timerTaskEntry) {synchronized (timerTaskEntry) {if(timerTaskEntry.list == this) {timerTaskEntry.next.prev = timerTaskEntry.prev;timerTaskEntry.prev.next = timerTaskEntry.next;timerTaskEntry.next = null;timerTaskEntry.prev = null;timerTaskEntry.list = null;taskCounter.decrementAndGet();}}}public synchronized void flush(Consumer<TimerTaskEntry> f) {TimerTaskEntry head = root.next;while(head != root) {remove(head);f.accept(head);head = root.next;}expiration.set(-1L);}//获得当前任务剩余时间@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(Math.max(getExpiration() - System.currentTimeMillis(),0),TimeUnit.MICROSECONDS);}@Overridepublic int compareTo(Delayed d) {TimerTaskList other = (TimerTaskList) d;return Long.compare(getExpiration(),other.getExpiration());}
}

4.4 时间轮结构

时间轮TimeWheel代表一层时间轮,即第一层时间轮表示20ms,主要功能是添加任务和驱动时间轮向前。

public class TimingWheel {private Long tickMs;  //每一个槽表示的时间范围private Integer wheelSize; // 时间轮大小,即每一层时间轮的大小private Long startMs; // 系统的启动时间private AtomicInteger taskCounter;  // 当前层任务数private DelayQueue<TimerTaskList> queue; //延迟队列,用于从队列取每个任务列表private Long interval; //每一层时间轮代表的时间private List<TimerTaskList> buckets;  // 每一层的每一个槽中的时间任务列表private Long currentTime;  // 修正后的系统启动时间private TimingWheel overflowWheel = null;  // 上一层时间轮public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {this.tickMs = tickMs;this.wheelSize = wheelSize;this.startMs = startMs;this.taskCounter = taskCounter;this.queue = queue;interval = tickMs * wheelSize;currentTime = startMs - (startMs % tickMs); //当前时间,往前推buckets = new ArrayList<>(wheelSize);for(int i = 0;i < wheelSize;i++) {buckets.add(new TimerTaskList(taskCounter));  //创建每一个槽中的列表}}// 创建上层时间轮public synchronized void addOverflowWheel() {if(overflowWheel == null) {overflowWheel = new TimingWheel(interval,  // 此处interval即表示上一层时间轮表示的范围wheelSize,currentTime,taskCounter,queue);}}// 添加任务public boolean add(TimerTaskList.TimerTaskEntry timerTaskEntry) {Long expiration = timerTaskEntry.expirationMs;Long thisTime = currentTime + tickMs;// 任务是否已经取消,取消则返回if(timerTaskEntry.cancel()) {return false;// 当前任务是否已经过期,如果过期则返回false,要立即执行}else if(expiration < currentTime + tickMs) {return false;// 判断当前任务能否在添加到当前时间轮}else if(expiration < currentTime + interval) {Long virtualId = expiration / tickMs;  // 计算当前任务要分配在哪个槽中int whereBucket = (int) (virtualId % wheelSize);TimerTaskList bucket = buckets.get((int)(virtualId % wheelSize));bucket.add(timerTaskEntry);long bucketExpiration = virtualId * tickMs;//更新槽的过期时间,添加入延迟队列if(bucket.setExpiration(virtualId * tickMs)) {queue.offer(bucket);}return true;}else {//添加任务到高层时间轮if(overflowWheel == null) addOverflowWheel();return overflowWheel.add(timerTaskEntry);}}// 向前驱动时间public void advanceClock(Long timeMs) {if(timeMs >= currentTime + tickMs) {currentTime = timeMs - (timeMs % tickMs);if(overflowWheel != null) {overflowWheel.advanceClock(currentTime);}}}
}

4.5 时间轮接口

  • kafka中提供了Timer接口,用于对外提供调用,分别是
    • Timer#add 添加任务;
    • Timer#advanceClock 驱动时间;
    • Timer#size 时间轮中总任务数;
    • Timer#shutdown 停止时间轮
public interface Timer {void add(TimerTask timerTask);boolean advanceClock(Long timeoutMs) throws Exception;int size();void shutdown();
}
  • Timer的实现类是SystemTimer
public class SystemTimer implements Timer {private String executorName;private Long tickMs = 1L;private Integer wheelSize = 20;private Long startMs = System.currentTimeMillis();//用来执行TimerTask任务private ExecutorService taskExecutor =Executors.newFixedThreadPool(1,(runnable) -> {Thread thread = new Thread(runnable);thread.setName("executor-" + executorName);thread.setDaemon(false);return thread;});//延迟队列private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();private AtomicInteger taskCounter = new AtomicInteger(0);private TimingWheel timingWheel;private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();private ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();// 用来执行时间轮的重新排列,及上一个槽中的任务列表被执行后,后面的槽中的任务列表移动private Consumer<TimerTaskEntry> reinsert = (timerTaskEntry) -> addTimerTaskEntry(timerTaskEntry);public SystemTimer(String executorName, Long tickMs, Integer wheelSize, Long startMs) {this.executorName = executorName;this.tickMs = tickMs;this.wheelSize = wheelSize;this.startMs = startMs;this.timingWheel = new TimingWheel(tickMs,wheelSize,startMs,taskCounter,delayQueue);}// 可能会多个线程操作,所以需要加锁@Overridepublic void add(TimerTask timerTask) {readLock.lock();try{addTimerTaskEntry(new TimerTaskEntry(timerTask,timerTask.delayMs + System.currentTimeMillis()));}finally {readLock.unlock();}}private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {      // 往时间轮添加任务if(!timingWheel.add(timerTaskEntry)) {// 返回false并且任务未取消,则提交当前任务立即执行。if(!timerTaskEntry.cancel()) {taskExecutor.submit(timerTaskEntry.timerTask);}}}// 向前驱动时间轮@Overridepublic boolean advanceClock(Long timeoutMs) throws Exception{// 使用阻塞队列获取任务TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);if(bucket != null) {writeLock.lock();try{while(bucket != null) {timingWheel.advanceClock(bucket.getExpiration());// 驱动时间后,需要移动TimerTaskList到上一个槽或者从上一层移动到本层bucket.flush(reinsert);bucket = delayQueue.poll();}}finally {writeLock.unlock();}return true;}else {return false;}}@Overridepublic int size() {return taskCounter.get();}@Overridepublic void shutdown() {taskExecutor.shutdown();}
}

4.6 时间轮接口测试

public class SystemTimerTest {//驱动时间轮向前的线程private static ExecutorService executorService = Executors.newFixedThreadPool(1);public static  SystemTimer timer = new SystemTimer("test",1000L,5,System.currentTimeMillis());public static void runTask() throws Exception {for(int i = 0;i < 10000;i+= 1000) {// 添加任务,每个任务间隔1stimer.add(new TimerTask(i) {@Overridepublic void run() {System.out.println("运行testTask的时间: " + System.currentTimeMillis());}});}}public static void main(String[] args) throws Exception {runTask();executorService.submit(() -> {while(true) {try {// 驱动时间轮线程间隔0.2s驱动timer.advanceClock(200L);} catch (Exception e) {e.printStackTrace();}}});Thread.sleep(1000000);timer.shutdown();executorService.shutdown();}
}