> 文章列表 > jdk-ScheduledThreadPoolExecutor源码学习

jdk-ScheduledThreadPoolExecutor源码学习

jdk-ScheduledThreadPoolExecutor源码学习

基本介绍

    ScheduledThreadPoolExecutor 是 Java 并发包中的一个可调度线程池,它继承自 ThreadPoolExecutor 类,并实现了 ScheduledExecutorService 接口。该线程池可以根据任务的执行时间进行调度执行,支持周期性任务和延迟执行任务。

ScheduledThreadPoolExecutor

继层关系:

jdk-ScheduledThreadPoolExecutor源码学习

可以看出ScheduledThreadPoolExecutor继承于ThreadPoolExecutor与ScheduledExecutorService。

基本使用

package com.executor;import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/* @ClassName ScheduledThreadPoolExecutorStudy* @Description 定时任务线程池* @Author csh* @Date 2023/4/7 9:52*/
public class ScheduledThreadPoolExecutorStudy {public static void main(String[] args) {//corePoolSize指线程ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);//延迟一秒执行for (int i = 0; i < 10; i++) {executor.schedule(new Task(i,"任务1:"),1, TimeUnit.SECONDS);}//周期性执行 每隔3秒(period) 延迟0秒(initialDelay) 定时任务//period 用于间隔执行任务,//1.执行时间小于period:执行完后再隔period时间再执行下一批;//2.执行时间大于period:执行完后不会再隔period时间,而是直接执行。for (int i = 0; i < 10; i++) {executor.scheduleAtFixedRate(new Task(i,"任务2:"),0,3,TimeUnit.SECONDS);}//周期性执行 每隔15秒(delay) 延迟0秒(initialDelay)//initialDelay 首次启动延迟多久后执行//delay 下次任务执行前的延迟时间for (int i = 0; i < 10; i++) {executor.scheduleWithFixedDelay(new Task(i,"任务3:"),3,1,TimeUnit.SECONDS);}}static class Task implements Runnable {private int taskId;private String taskName="默认!";public Task(int taskId) {this.taskId = taskId;}public Task(int taskId, String taskName) {this.taskId = taskId;this.taskName = taskName;}@Overridepublic void run() {System.out.println(taskName + taskId + "开始执行,当前线程名为" + Thread.currentThread().getName());try {// 模拟任务执行的耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(taskName + taskId + "执行完毕,当前线程名为" + Thread.currentThread().getName());}}
}

结果

任务2:0开始执行,当前线程名为pool-1-thread-1
任务2:0执行完毕,当前线程名为pool-1-thread-1
任务2:1开始执行,当前线程名为pool-1-thread-1
任务2:1执行完毕,当前线程名为pool-1-thread-1
任务2:2开始执行,当前线程名为pool-1-thread-1
任务2:2执行完毕,当前线程名为pool-1-thread-1
任务2:3开始执行,当前线程名为pool-1-thread-1
任务2:3执行完毕,当前线程名为pool-1-thread-1
任务2:4开始执行,当前线程名为pool-1-thread-1
任务2:4执行完毕,当前线程名为pool-1-thread-1
任务2:5开始执行,当前线程名为pool-1-thread-1
任务2:5执行完毕,当前线程名为pool-1-thread-1
任务2:6开始执行,当前线程名为pool-1-thread-1
任务2:6执行完毕,当前线程名为pool-1-thread-1
任务2:7开始执行,当前线程名为pool-1-thread-1
任务2:7执行完毕,当前线程名为pool-1-thread-1
任务2:8开始执行,当前线程名为pool-1-thread-1
任务2:8执行完毕,当前线程名为pool-1-thread-1
任务2:9开始执行,当前线程名为pool-1-thread-1
任务2:9执行完毕,当前线程名为pool-1-thread-1
任务1:0开始执行,当前线程名为pool-1-thread-1
任务1:0执行完毕,当前线程名为pool-1-thread-1
任务1:1开始执行,当前线程名为pool-1-thread-1
任务1:1执行完毕,当前线程名为pool-1-thread-1
任务1:2开始执行,当前线程名为pool-1-thread-1
任务1:2执行完毕,当前线程名为pool-1-thread-1
任务1:3开始执行,当前线程名为pool-1-thread-1
任务1:3执行完毕,当前线程名为pool-1-thread-1
任务1:4开始执行,当前线程名为pool-1-thread-1
任务1:4执行完毕,当前线程名为pool-1-thread-1
任务1:5开始执行,当前线程名为pool-1-thread-1
任务1:5执行完毕,当前线程名为pool-1-thread-1
任务1:6开始执行,当前线程名为pool-1-thread-1
任务1:6执行完毕,当前线程名为pool-1-thread-1
任务1:7开始执行,当前线程名为pool-1-thread-1
任务1:7执行完毕,当前线程名为pool-1-thread-1
任务1:8开始执行,当前线程名为pool-1-thread-1
任务1:8执行完毕,当前线程名为pool-1-thread-1
任务1:9开始执行,当前线程名为pool-1-thread-1
任务1:9执行完毕,当前线程名为pool-1-thread-1
任务2:0开始执行,当前线程名为pool-1-thread-1
任务2:0执行完毕,当前线程名为pool-1-thread-1
任务2:1开始执行,当前线程名为pool-1-thread-1
任务2:1执行完毕,当前线程名为pool-1-thread-1
任务2:2开始执行,当前线程名为pool-1-thread-1
任务2:2执行完毕,当前线程名为pool-1-thread-1
任务2:3开始执行,当前线程名为pool-1-thread-1
任务2:3执行完毕,当前线程名为pool-1-thread-1
任务2:4开始执行,当前线程名为pool-1-thread-1
任务2:4执行完毕,当前线程名为pool-1-thread-1
任务2:5开始执行,当前线程名为pool-1-thread-1
任务2:5执行完毕,当前线程名为pool-1-thread-1
任务2:6开始执行,当前线程名为pool-1-thread-1
任务2:6执行完毕,当前线程名为pool-1-thread-1
任务2:7开始执行,当前线程名为pool-1-thread-1
任务2:7执行完毕,当前线程名为pool-1-thread-1
任务2:8开始执行,当前线程名为pool-1-thread-1
任务2:8执行完毕,当前线程名为pool-1-thread-1
任务2:9开始执行,当前线程名为pool-1-thread-1
任务2:9执行完毕,当前线程名为pool-1-thread-1
任务3:0开始执行,当前线程名为pool-1-thread-1
任务3:0执行完毕,当前线程名为pool-1-thread-1
任务3:1开始执行,当前线程名为pool-1-thread-1
任务3:1执行完毕,当前线程名为pool-1-thread-1
任务3:2开始执行,当前线程名为pool-1-thread-1
任务3:2执行完毕,当前线程名为pool-1-thread-1
任务3:3开始执行,当前线程名为pool-1-thread-1
任务3:3执行完毕,当前线程名为pool-1-thread-1
任务3:4开始执行,当前线程名为pool-1-thread-1
任务3:4执行完毕,当前线程名为pool-1-thread-1
任务3:5开始执行,当前线程名为pool-1-thread-1
任务3:5执行完毕,当前线程名为pool-1-thread-1
任务3:6开始执行,当前线程名为pool-1-thread-1
任务3:6执行完毕,当前线程名为pool-1-thread-1
任务3:7开始执行,当前线程名为pool-1-thread-1
任务3:7执行完毕,当前线程名为pool-1-thread-1
任务3:8开始执行,当前线程名为pool-1-thread-1
任务3:8执行完毕,当前线程名为pool-1-thread-1
任务3:9开始执行,当前线程名为pool-1-thread-1
任务3:9执行完毕,当前线程名为pool-1-thread-1
任务2:0开始执行,当前线程名为pool-1-thread-1
任务2:0执行完毕,当前线程名为pool-1-thread-1
任务2:1开始执行,当前线程名为pool-1-thread-1
任务2:1执行完毕,当前线程名为pool-1-thread-1

源码学习

package java.util.concurrent;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
//ScheduledThreadPoolExecutor 是一个可以定时或延迟执行任务的线程池,通过继承ThreadPoolExecutor和实现ScheduledExecutorService 的基础来实现功能。
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {/1 实现 ThreadPoolExecutor。使用自定义任务类型,ScheduledFutureTask 用于任务,即使是那些不需要调度的任务(即那些使用 ExecutorService 执行,而不是 ScheduledExecutorService 方法提交的任务),它们被视为延迟为零的延迟任务。2. 使用自定义队列(DelayedWorkQueue),这是无限 DelayQueue 的变体。与 ThreadPoolExecutor 相比,缺乏容量约束以及 corePoolSize 和 maximumPoolSize 实际上相同的事实简化了一些执行机制(请参阅延迟执行)。3. 支持可选的关机后运行参数,导致覆盖关机方法以删除和取消关机后不应运行的任务,以及任务(重新)提交与关机重叠时不同的重检逻辑。4. 允许拦截和检测的任务修饰方法,这是必需的,因为子类无法以其他方式覆盖提交方法来获得此效果。不过,这些对池控制逻辑没有任何影响。//* 默认为false用于shutDown周期性任务,若如为true则代码为周期性任务。(也用于判断是否关闭)*/private volatile boolean continueExistingPeriodicTasksAfterShutdown;/*线程池停止后:取消不是周期任务则为false,返之为true(默认)*/private volatile boolean executeExistingDelayedTasksAfterShutdown = true;/* 是否将取消的任务移出队列,默认不移出*/private volatile boolean removeOnCancel = false;/* 通过原子子来保证先进先出*/private static final AtomicLong sequencer = new AtomicLong();/* 返回当前纳秒时间*/final long now() {return System.nanoTime();}//定时任务的实现private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {/ 先进先出的序列号用于断开连接 */private final long sequenceNumber;/ 纳秒时间 */private long time;/* 重复任务的周期(以纳秒为单位)。* 正值表示固定速率执行。* 负值表示固定延迟执行。值 0 表示非重复任务。*/private final long period;/ 当前任务 */RunnableScheduledFuture<V> outerTask = this;/* 队列的索引*/int heapIndex;/*构造方法* ns 纳秒* r r任务* result 返回结果/ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}/* 构造方法*/ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}/* 构造方法*/ScheduledFutureTask(Callable<V> callable, long ns) {super(callable);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}/ 获取延迟的纳秒时间/public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}/对比方法 传入的对象与当前的对象判断是否相同 ,相同返回0 不相同返回 /public int compareTo(Delayed other) {//相同返回0if (other == this) // compare zero if same objectreturn 0;//判断是否ScheduledFutureTask类型if (other instanceof ScheduledFutureTask) {//转换成任务ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;//获取减后时间long diff = time - x.time;//如果小于0 返回-1if (diff < 0)return -1;//大于0返回 1else if (diff > 0)return 1;//当前序号小于传进来参数序号 返回-1else if (sequenceNumber < x.sequenceNumber)return -1;else//都不是返回1return 1;}//最后通过延迟时间计算返回long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}/*判断是否延迟执行 @return {@code true} if periodic*/public boolean isPeriodic() {return period != 0;}/* 设置下次为定期任务运行的时间。*/private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}//取消任务public boolean cancel(boolean mayInterruptIfRunning) {//取消任务boolean cancelled = super.cancel(mayInterruptIfRunning);//取消成功 且 删除标识为真 且 队列索引大于0则进行队列移除if (cancelled && removeOnCancel && heapIndex >= 0)remove(this);return cancelled;}/*执行的方法*/public void run() {//获取是否重复任务标识 true为是 false为否boolean periodic = isPeriodic();//判断线程池状态不支持执行任务,则取消if (!canRunInCurrentRunState(periodic))cancel(false);//不需要重复执行,则直接调用run方法运行else if (!periodic)ScheduledFutureTask.super.run();//重复执行 调用后置else if (ScheduledFutureTask.super.runAndReset()) {//计算下次触发时间setNextRunTime();//重新加入执行队列reExecutePeriodic(outerTask);}}}/* periodic 是否重复周期返回当然运行或关闭状态*/boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);}/* 延迟或周期性任务的主要执行方法。如果池已关闭,则拒绝任务。否则,将任务添加到队列并启动线程(如有必要)以运行它。 @param task the task*/private void delayedExecute(RunnableScheduledFuture<?> task) {//是否关闭if (isShutdown())//调用拒绝执行策略reject(task);else {//不关闭情况下,将任务添加进了队列中super.getQueue().add(task);//再次判断是否关闭(有可能中间关了) 且if (isShutdown() && 当前运行状态为已关闭!canRunInCurrentRunState(task.isPeriodic()) &&//从队列移除remove(task))//取消此次任务task.cancel(false);else//预启动线程池中的工作线程ensurePrestart();}}/*重新执行指定的周期性任务,并将该任务插入到任务队列中。 @param task the task*/void reExecutePeriodic(RunnableScheduledFuture<?> task) {//判断是否允许执行的状态if (canRunInCurrentRunState(true)) {//加入队列super.getQueue().add(task);//再次判断如果为直就移除和取消任务if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);else//预启动线程池中的工作线程ensurePrestart();}}/* 取消并关闭线程池的方法*/@Override void onShutdown() {//获取当前队列BlockingQueue<Runnable> q = super.getQueue();//获取延迟状态boolean keepDelayed =getExecuteExistingDelayedTasksAfterShutdownPolicy();//获取周期任务状态boolean keepPeriodic =getContinueExistingPeriodicTasksAfterShutdownPolicy();//不是延迟状态且不是周期任务if (!keepDelayed && !keepPeriodic) {//循环全部取消及清空for (Object e : q.toArray())if (e instanceof RunnableScheduledFuture<?>)((RunnableScheduledFuture<?>) e).cancel(false);q.clear();}else {// Traverse snapshot to avoid iterator exceptions//如果是周期的先获取列表for (Object e : q.toArray()) {//如果为RunnableScheduledFuture类型if (e instanceof RunnableScheduledFuture) {//转换RunnableScheduledFuture<?> t =(RunnableScheduledFuture<?>)e;//全部删阴笑后取消当前任务if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||t.isCancelled()) { // also remove if already cancelledif (q.remove(t))t.cancel(false);}}}}tryTerminate();}/* 修改或替换用于执行可运行对象的任务。(用于包装)*/protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {return task;}//同上类似protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {return task;}//构造方法 指定核心数public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}//构造方法//corePoolSize 核心数//threadFactory 线程工厂public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);}//构造方法//corePoolSize 核心数//handler 失败策略public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), handler);}//构造方法//corePoolSize 核心数//threadFactory 线程工厂//handler 失败策略public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);}/* 返回延迟操作的触发时间*/private long triggerTime(long delay, TimeUnit unit) {//最小为0return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));}/* 计算时间 当前时间+延迟时间*/long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}/* 将队列中所有延迟的值限制在彼此的 Long.MAX_VALUE 范围内,以避免在比较中溢出。*/private long overflowFree(long delay) {Delayed head = (Delayed) super.getQueue().peek();if (head != null) {long headDelay = head.getDelay(NANOSECONDS);if (headDelay < 0 && (delay - headDelay < 0))delay = Long.MAX_VALUE + headDelay;}return delay;}/* 定时执行方法* command 执行任务* delay 延迟时间* unit 延迟时间单位*/public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {//为空进行异常 抛出if (command == null || unit == null)throw new NullPointerException();//包装一下RunnableScheduledFuture<?> t = decorateTask(command,//定时任务包装new ScheduledFutureTask<Void>(command, null,//获取延迟执行时间triggerTime(delay, unit)));//延迟执行delayedExecute(t);//返回包装后的对象return t;}/* 同上类似*/public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {if (callable == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));delayedExecute(t);return t;}/* 周期性执行 每隔x秒(period) 延迟x秒(initialDelay) 定时任务* //period 用于间隔执行任务,* //1.执行时间小于period:执行完后再隔period时间再执行下一批;* //2.执行时间大于period:执行完后不会再隔period时间,而是直接执行。*/public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {//任务或单为空抛出空指针异常if (command == null || unit == null)throw new NullPointerException();//如果延迟时间小于等于0则抛出异常if (period <= 0)throw new IllegalArgumentException();//创建任务ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));//包装任务RunnableScheduledFuture<Void> t = decorateTask(command, sft);//初始化周期任务属性sft.outerTask = t;//执行任务delayedExecute(t);//返回封装后对象return t;}/* 周期性执行 每隔x秒(delay) 延迟0秒(initialDelay)//initialDelay 首次启动延迟多久后执行//delay 下次任务执行前的延迟时间*/public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);//初始化周期任务属性sft.outerTask = t;//执行任务delayedExecute(t);return t;}//无返回 零延迟执行任务public void execute(Runnable command) {schedule(command, 0, NANOSECONDS);}// Override AbstractExecutorService methods//有返回 零延迟执行任务public Future<?> submit(Runnable task) {return schedule(task, 0, NANOSECONDS);}//带指定返回结果 的提交public <T> Future<T> submit(Runnable task, T result) {return schedule(Executors.callable(task, result), 0, NANOSECONDS);}//同上类似public <T> Future<T> submit(Callable<T> task) {return schedule(task, 0, NANOSECONDS);}/* 设置是否继续执行现有定期任务的策略。*/public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {continueExistingPeriodicTasksAfterShutdown = value;if (!value && isShutdown())onShutdown();}/* 获取有关是否继续执行现有定期任务的策略*/public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {return continueExistingPeriodicTasksAfterShutdown;}/*设置是否执行现有延迟任务的策略,即使此执行程序已关闭。在这种情况下,这些任务只会在 shutdownNow 时终止,或者在已关闭时将策略设置为 false 后终止。默认情况下,此值为 true。*/public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {executeExistingDelayedTasksAfterShutdown = value;if (!value && isShutdown())onShutdown();}/* 获取有关是否执行现有延迟任务的策略,即使此执行程序已关闭。在这种情况下,这些任务只会在 shutdownNow 时终止,或者在已关闭时将策略设置为 false 后终止。*/public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {return executeExistingDelayedTasksAfterShutdown;}/* 设置在取消时是否应立即从工作队列中删除已取消任务的策略。*/public void setRemoveOnCancelPolicy(boolean value) {removeOnCancel = value;}/* 获取有关在取消时是否应立即从工作队列中删除已取消任务的策略。*/public boolean getRemoveOnCancelPolicy() {return removeOnCancel;}/* 关闭线程池方法(延迟)*/public void shutdown() {super.shutdown();}/* 立即关闭线程池方法并返回任务列表*/public List<Runnable> shutdownNow() {return super.shutdownNow();}/* 获取阻塞队列*/public BlockingQueue<Runnable> getQueue() {return super.getQueue();}/*延迟队列 的实现* 注意:DelayedWorkQueue 是基于堆结构的队列。底层使用的是数组*/static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {/DelayedWorkQueue 基于基于堆的数据结构,如 DelayQueue 和 PriorityQueue 中的数据结构,不同之处在于每个 ScheduledFutureTask 还将其索引记录到堆数组中。这消除了在取消时查找任务的需要,大大加快了删除速度(从 O(n) 下降到 O(log n)),并减少了垃圾保留,否则在清除之前等待元素上升到顶部会发生这种情况。但是由于队列也可能包含不是 ScheduledFutureTasks 的 RunnableScheduledFutures,因此我们不能保证有这样的索引可用,在这种情况下,我们回退到线性搜索。(我们预计大多数任务不会被修饰,并且更快的情况将更加常见。所有堆操作都必须记录索引更改 - 主要是在 siftUp 和 siftDown 中。删除后,任务的堆索引设置为 -1。*///队列的初始容量private static final int INITIAL_CAPACITY = 16;//队列private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];//重入锁private final ReentrantLock lock = new ReentrantLock();//队列当前长度private int size = 0;//指定用于等待队列头部任务的线程。private Thread leader = null;/* 当较新的任务在队列的头部可用时或需要更换leader,通过该条件发出信号*/private final Condition available = lock.newCondition();/* 设置头部索引*/private void setIndex(RunnableScheduledFuture<?> f, int idx) {if (f instanceof ScheduledFutureTask)((ScheduledFutureTask)f).heapIndex = idx;}/* 右移 提供给删除时使用(从大到小)*/private void siftUp(int k, RunnableScheduledFuture<?> key) {//循环获取while (k > 0) {//k-1向右移一位(无符号)int parent = (k - 1) >>> 1;//从队列中取一个线程RunnableScheduledFuture<?> e = queue[parent];//如果大于0跳出if (key.compareTo(e) >= 0)break;//赋值queue[k] = e;setIndex(e, k);k = parent;}//重新赋值queue[k] = key;setIndex(key, k);}/* 从队列中删除一个元素(从小到大)*/private void siftDown(int k, RunnableScheduledFuture<?> key) {//当前右移动一位(无符号)int half = size >>> 1;//循环while (k < half) {//获取自增1 (k-1)再+1int child = (k << 1) + 1;//获取任务RunnableScheduledFuture<?> c = queue[child];//获取下一个位置int right = child + 1;//小于总数 且 对比一致if (right < size && c.compareTo(queue[right]) > 0)//获取元素c = queue[child = right];//如果相同if (key.compareTo(c) <= 0)//跳出break;//重新赋值queue[k] = c;setIndex(c, k);k = child;}queue[k] = key;setIndex(key, k);}/*调整堆数组的大小。*/private void grow() {//获取队列长度int oldCapacity = queue.length;// 旧队列+旧队列右移一位 相当于 10+10/2= 15int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%//如果小于0 那么用最大值if (newCapacity < 0) // overflownewCapacity = Integer.MAX_VALUE;//进行复制queue = Arrays.copyOf(queue, newCapacity);}/*查询对象所在队列位置,如果找不到返回-1*/private int indexOf(Object x) {//不为空if (x != null) {//为任务if (x instanceof ScheduledFutureTask) {int i = ((ScheduledFutureTask) x).heapIndex;// Sanity check; x could conceivably be a// ScheduledFutureTask from some other pool.//循环判断if (i >= 0 && i < size && queue[i] == x)return i;} else {//不是任务循环列表查询for (int i = 0; i < size; i++)if (x.equals(queue[i]))return i;}}//找不到或传入对象为空返回-1return -1;}//判断队列是否包含该任务 包含:true 不包含:falsepublic boolean contains(Object x) {//上锁final ReentrantLock lock = this.lock;lock.lock();try {return indexOf(x) != -1;} finally {lock.unlock();}}//删除任务public boolean remove(Object x) {//上锁final ReentrantLock lock = this.lock;lock.lock();try {//获取下标int i = indexOf(x);//小于0证明找不到if (i < 0)return false;//总数-1setIndex(queue[i], -1);//重新获取数量int s = --size;RunnableScheduledFuture<?> replacement = queue[s];//将队列置空queue[s] = null;if (s != i) {//从队列中移出siftDown(i, replacement);if (queue[i] == replacement)siftUp(i, replacement);}return true;} finally {//最后解锁lock.unlock();}}//获取队列总数 带锁public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return size;} finally {lock.unlock();}}//判断当前队列是否为空 如果是则返回truepublic boolean isEmpty() {return size() == 0;}//获取最大容量public int remainingCapacity() {return Integer.MAX_VALUE;}//获取第一个任务public RunnableScheduledFuture<?> peek() {final ReentrantLock lock = this.lock;lock.lock();try {return queue[0];} finally {lock.unlock();}}//将一个工作项插入到延迟工作队列中。工作项会按照其延迟时间进行排序,并被放置在适当的位置上。如果工作项已经过期,则会立即被取出并执行。public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();//总数+1size = i + 1;//如果为第一个进行赋值if (i == 0) {queue[0] = e;setIndex(e, 0);} else {//添加任务到队列中siftUp(i, e);}//第0个相等if (queue[0] == e) {//leader置空 重新分配leader = null;available.signal();}} finally {lock.unlock();}return true;}//添加任务方法public void put(Runnable e) {offer(e);}//有返回添加方法public boolean add(Runnable e) {return offer(e);}//带过期时间有返回添加方法public boolean offer(Runnable e, long timeout, TimeUnit unit) {return offer(e);}/*将第一个元素替换为最后一个元素并筛选它。*/private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)siftDown(0, x);setIndex(f, -1);return f;}//从延迟工作队列中取出并返回一个可以执行的工作项。如果队列为空,则返回 null。public RunnableScheduledFuture<?> poll() {final ReentrantLock lock = this.lock;lock.lock();try {RunnableScheduledFuture<?> first = queue[0];if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn finishPoll(first);} finally {lock.unlock();}}//从延迟工作队列中取出并返回一个可以执行的工作项,如果队列为空,则当前线程会被阻塞直到队列中有可用的工作项。public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;//带中断锁lock.lockInterruptibly();try {//循环for (;;) {//获取第一个RunnableScheduledFuture<?> first = queue[0];//为空则进入等待if (first == null)available.await();else {//不为空获取延迟时间long delay = first.getDelay(NANOSECONDS);//不延迟则直接返回if (delay <= 0)return finishPoll(first);//将引用置用first = null; // don't retain ref while waiting//leader为空则等待if (leader != null)available.await();else {//获取当前线程并将当前线程赋给leaderThread thisThread = Thread.currentThread();leader = thisThread;try {//等待延迟时间available.awaitNanos(delay);} finally {//如果leader与当前线程一致则将leader置空if (leader == thisThread)leader = null;}}}}} finally {//如果leader为空且队列有值进行重置leaderif (leader == null && queue[0] != null)available.signal();//解锁lock.unlock();}}//带过期时间从延迟工作队列中取出并返回一个可以执行的工作项。如果队列为空,则返回 null。public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null) {if (nanos <= 0)return null;elsenanos = available.awaitNanos(nanos);} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);if (nanos <= 0)return null;first = null; // don't retain ref while waitingif (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);else {Thread thisThread = Thread.currentThread();leader = thisThread;try {long timeLeft = available.awaitNanos(delay);nanos -= delay - timeLeft;} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}//清空队列的方法public void clear() {final ReentrantLock lock = this.lock;lock.lock();try {for (int i = 0; i < size; i++) {RunnableScheduledFuture<?> t = queue[i];if (t != null) {queue[i] = null;setIndex(t, -1);}}size = 0;} finally {lock.unlock();}}/* 获取队列第一个元素*/private RunnableScheduledFuture<?> peekExpired() {// assert lock.isHeldByCurrentThread();RunnableScheduledFuture<?> first = queue[0];return (first == null || first.getDelay(NANOSECONDS) > 0) ?null : first;}//将队列中数据放到c中,并返回数量;(关闭时使用到)public int drainTo(Collection<? super Runnable> c) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();final ReentrantLock lock = this.lock;lock.lock();try {RunnableScheduledFuture<?> first;int n = 0;while ((first = peekExpired()) != null) {c.add(first); // In this order, in case add() throws.finishPoll(first);++n;}return n;} finally {lock.unlock();}}//同上类似public int drainTo(Collection<? super Runnable> c, int maxElements) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();if (maxElements <= 0)return 0;final ReentrantLock lock = this.lock;lock.lock();try {RunnableScheduledFuture<?> first;int n = 0;while (n < maxElements && (first = peekExpired()) != null) {c.add(first); // In this order, in case add() throws.finishPoll(first);++n;}return n;} finally {lock.unlock();}}//将队列转成数组public Object[] toArray() {final ReentrantLock lock = this.lock;lock.lock();try {return Arrays.copyOf(queue, size, Object[].class);} finally {lock.unlock();}}//传入a数组将结果放进a返回@SuppressWarnings("unchecked")public <T> T[] toArray(T[] a) {final ReentrantLock lock = this.lock;lock.lock();try {if (a.length < size)return (T[]) Arrays.copyOf(queue, size, a.getClass());System.arraycopy(queue, 0, a, 0, size);if (a.length > size)a[size] = null;return a;} finally {lock.unlock();}}//迭代方法public Iterator<Runnable> iterator() {return new Itr(Arrays.copyOf(queue, size));}//迭代器的实现 包含增 查 删 改private class Itr implements Iterator<Runnable> {final RunnableScheduledFuture<?>[] array;int cursor = 0; // index of next element to returnint lastRet = -1; // index of last element, or -1 if no suchItr(RunnableScheduledFuture<?>[] array) {this.array = array;}public boolean hasNext() {return cursor < array.length;}public Runnable next() {if (cursor >= array.length)throw new NoSuchElementException();lastRet = cursor;return array[cursor++];}public void remove() {if (lastRet < 0)throw new IllegalStateException();DelayedWorkQueue.this.remove(array[lastRet]);lastRet = -1;}}}
}

关于DelayedWorkQueue队列介绍

jdk-ScheduledThreadPoolExecutor源码学习

    DelayedWorkQueue是一个基于最小堆结构的优先队列,主要用于实现延迟任务调度,它可以让我们在指定的时间之后执行一个任务。通过实现Delayed接口,我们可以将需要延迟执行的任务放入这个队列中,并且队列会根据任务的延迟时间进行排序,从而保证按照规定的时间顺序执行任务。DelayedWorkQueue实现于BlockingQueue所以是一个阻塞队列,是一个无界队列。

  • 使用DelayedWorkQueue可以很方便地实现诸如如下需求的场景:

  • 在指定时间之后执行某个任务,比如凌晨3点执行特定任务;

  • 实现任务的延迟执行,比如任务完成后需要等待30秒才能执行下一步操作;

  • 对于需要进行定时调度的任务,比如每10分钟执行一次清理工作。

使用DelayedWorkQueue可以帮助我们避免使用Thread.sleep()等阻塞操作,提高代码效率和可维护性。同时,由于DelayedWorkQueue是基于Java并发包中的线程安全队列实现的,因此也能够保证多线程程序的正确性。

DelayedWorkQueue的特性主要体现在以下两个方面:

  • 支持延迟执行。DelayedWorkQueue允许指定执行任务的延迟时间,从而实现异步任务调度的功能。

  • 支持优先级调度。DelayedWorkQueue通常支持优先级调度,可以根据任务的优先级来决定处理任务的顺序,从而保证高优先级任务能够尽早得到执行。

什么是最小堆?

    最小堆是一种完全二叉树,其中每个节点都比它的子节点小。也就是说,最小堆的根节点是所有节点中最小的节点。因此,最小堆通常被用来实现优先级队列。

最小堆是一种经典的数据结构,在算法和数据结构中有广泛的应用。构建最小堆的过程通常是通过“下沉”操作来完成的。从末尾节点的父节点的这棵树开始调整,根据小根堆的性质,越小的数据往上移动,注意,被调整的节点还有子节点的情况,需要递归进行调整。

最小堆和最大堆类似,但是一个是贪心思想,一个是动态规划的思想,可以用于解决一些算法问题,如堆排序、求Top K等问题。

参考:

https://zhuanlan.zhihu.com/p/299756305

https://zhuanlan.zhihu.com/p/341418979

最后

    ScheduledThreadPoolExecutor中的轮训或者说是延迟实现,可以很好的解决定时去执行的任务,并且通过线程池能够很有效的去管理资源的分配。当然ScheduledThreadPoolExecutor实现了自已优先队列DelayedWorkQueue是一个基于最小堆结构的优先队列,保证每次出队时取出的任务是队列中下次执行时间最小的任务。像很多开源的xxjob或者一些开源框架都有大量的引用,通过该工具类可以支持很多业务场景,当然在使用该线程池要特别注意策略这块的配置,避免因为不知道或者使用默认导致各种场景未考虑到。

参考:

https://www.cs.usfca.edu/~galles/visualization/Heap.html

https://blog.csdn.net/qq_40685275/article/details/99836268