ThreadPoolExecutor原理剖析
1.前言
1.1 为什么要使用线程池?
线程池主要为了解决两个问题
- 一是当执行大量异步任务时,线程池能够提供较好的性能,避免了重复创建和销毁线程带来的开销
- 二是线程池提供了一种资源限制和管理的手段,比如限制线程个数,动态新增线程
1.2 类图
ThreadPoolExecutor
继承自AbstractExecutorService
,上图中的Executors
类提供了很多静态方法用于创建不同的线程池实例。
在ThreadPoolExecutor
中,成员变量ctl
是一个原子变量,用于记录线程池状态以及线程中的线程个数,类似ReentrantReadWriteLock
中,state
变量的高16位表示读状态,低16位表示写状态。
2.线程池常见成员变量
2.1 线程池状态相关成员变量
// 线程池用高3位表示线程池状态,低29位表示线程个数
// 这里的ctl变量就是保存线程池状态+线程池中线程个数的变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 线程个数掩码位数,值为Integer的位数(32)-3
private static final int COUNT_BITS = Integer.SIZE - 3;// 线程池中工作线程的数目,注意要和largestPoolSize区别开
// 线程池最大线程数:00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 线程池状态常量,用于判断线程池的状态// (高三位)11100000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;// (高三位)00000000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;// (高三位)00100000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;// (高三位)01000000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;// (高三位)01100000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
这里RUNNING
相对于其他变量稍微有点区别,因为RUNNING
的值是一个负数通过左移计算得来的。计算机内部是使用补码存放值的,-1的补码为11111111 11111111 11111111 11111111
,左移29位自然就是上面的值了。其他的正数进行左移自然就不用多说。
这里额外说一下线程中常用的用于获取线程状态及线程个数的方法,我们已经知道了ThreadPoolExecutor
用到了一个原子变量保存线程状态及线程个数,所以这几个方法都是对ctl
这个变量进行操作。
// 用于获取ctl变量中的运行状态:*00000 00000000 00000000 00000000
private static int runStateOf(int c) { return c & ~CAPACITY; }// 用于获取ctl变量中的线程个数:000*
private static int workerCountOf(int c) { return c & CAPACITY; }// 计算ctl变量的新值
private static int ctlOf(int rs, int wc) { return rs | wc; }// 表示线程池的运行状态是否小于s
private static boolean runStateLessThan(int c, int s) { return c < s; }// 表示线程池的运行状态是否至少为s
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
2.2 线程池其他常见成员变量
// 存储了线程池中所有的工作线程,用于快速查询线程池中是否存在某个线程。
// 当一个线程加入到线程池中时,会被 workers 集合持有。当线程被移除时,会从集合中删除。
private final HashSet<Worker> workers = new HashSet<Worker>();// 工作线程的阻塞队列,用于存放等待执行的任务。
private final BlockingQueue<Runnable> workQueue;// 线程池从开始到现在有过的最大线程数目
private int largestPoolSize;// 线程池总完成任务量计数
private long completedTaskCount;
3.线程池状态
从上面代码我们可以得知,线程池的状态可以分为:
RUNNING
:接受新任务,并且处理阻塞队列中的任务SHUTDOWN
:拒绝新任务,但是处理阻塞队列中的任务STOP
:拒绝新任务,并且抛弃阻塞队列中的任务,同时会中断正在处理中的任务TIDYING
:所有的任务(包括线阻塞队列中的任务)都执行完后,当前线程池中活动线程数为0,将调用terminated
方法TERMINATED
:终止状态。terminated
方法调用完成以后的状态
线程池的状态转换
RUNNING -> SHUTDOWN
:显式调用shutdown()
方法,或者隐式调用了finalize()
方法里面的shutdown()
方法。RUNNING 或 SHUTDOWN-> STOP
:显式调用shutdownNow()
方法时。SHUTDOWN -> TIDYING
:当线程池和任务队列都为空时。STOP -> TIDYING
:当线程池为空时。TIDYING -> TERMINATED
:当 terminated() hook 方法执行完成时。
4.线程池构造方法中的参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {...}
corePoolSize
:线程池核心线程个数。workQueue
:用于保存等待执行的任务的阻塞队列,意味着N个核心线程正在执行任务。- 常见的阻塞队列有
- 基于数组的有界ArrayBlockingQueue
- 基于链表的无界 LinkedBlockingQueue
- 最多只有一个元素的同步队列 SynchronousQueue
- 优先级队列 PriorityBlockingQueue
- 常见的阻塞队列有
maximunPoolSize
:线程池最大线程数量。当核心线程都正在执行任务,且阻塞队列已满,那么就会创建新的线程,直到maximunPoolSize个。ThreadFactory
:创建线程的工厂。RejectedExecutionHandler
:饱和策略,当队列满并且线程个数达到 maximunPoolSize后采取的策略。- 常见的包和策略有
- AbortPolicy(抛出异常)
- CallerRunsPolicy(使用调用者所在 线程来运行任务)
- DiscardOldestPolicy(调用 poll 丢弃一个任务,执行当前任务)
- DiscardPolicy(默默丢弃 , 不抛出异常)
- 常见的包和策略有
keeyAliveTime
:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。TimeUnit
:存活时间的时间单位。
5.源码分析
execute
这个不必多说,是最常用的方法,就是向线程池提交任务。
public void execute(Runnable command) {// 判断任务是否为null,是则抛出空指针异常if (command == null)throw new NullPointerException();// 判断当前线程池中的线程数目是否超过核心线程数目// 获取当前ctl的值int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 如果没有超过核心线程数目,那么就调用addWorker方法增加线程池中的线程,然后就此返回if (addWorker(command, true)) // 这里第二个参数用于标识添加的线程是否是核心线程return;// 重新获取ctl的值,因为执行addWorker方法时,线程池的状态可能已经发生了变化c = ctl.get();}// 上面的代码执行完,如果继续向下走,只有两种情况:// (1)workerCountOf(c) = corePoolSize:这种情况会把任务放入阻塞队列// (2)workerCountOf(c) > corePoolSize:这种情况会创建新线程// 如果线程池处于运行状态,那么就添加任务到阻塞队列中if (isRunning(c) && workQueue.offer(command)) {// 二次检查,重新获取ctl的值,原因同上int recheck = ctl.get();// 如果线程池已经不处于recheck状态,那么就尝试将刚刚添加的任务移除,同时执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 当前线程为空则添加一个新的线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 添加任务到阻塞队列失败,换句话说就是阻塞队列已满else if (!addWorker(command, false))// 执行拒绝策略reject(command);
}
接下来我们将详细讲解public void execute(Runnable command)
方法中的addWorker
方法。
addWorker
该方法的主要作用是向线程池中添加工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {// 外层循环retry:for (;;) {// 获取ctl变量的值int c = ctl.get();// 获得线程池状态int rs = runStateOf(c);// (1)// 检查队列是否只在必要时为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 内层循环// 循环CAS增加线程个数for (;;) {// 获取当前线程池中的线程数目int wc = workerCountOf(c);// 如果线程数目超限则直接返回false意味着添加工作线程失败if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// (2)// 这里使用CAS增加线程个数,只有一个线程可以CAS成功if (compareAndIncrementWorkerCount(c))// CAS操作成功则跳出外循环,继续向下执行break retry;// CAS操作失败以后,判断线程池状态是否发生改变// 如果变化则重新获取线程池状态值,进行下一轮的外层循环,如果未发生变化,那么就直接进行下一轮的内层循环(继续CAS增加线程数目)c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;}}/--------------------------------------------------------------------------------------------/// 标识新增线程是否启动成功boolean workerStarted = false;// 标识新增线程是否添加成功boolean workerAdded = false;Worker w = null;try {// 创建workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 加独占锁,为了实现workers同步,因为可能多个线程调用了execute方法final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 重新检查线程池状态,以免在获取锁前调用了shutdown接口int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 判断线程是否可以启动if (t.isAlive()) throw new IllegalThreadStateException();// 添加任务workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 将添加任务标识修改为成功workerAdded = true;}} finally {mainLock.unlock();}// 添加任务成功以后则启动任务,同时将启动任务标识修改为trueif (workerAdded) {t.start();workerStarted = true;}}} finally {// 如果线程启动失败,则调用addWorkerFailed方法将worker回滚到创建时的创建状态if (! workerStarted)addWorkerFailed(w);}// 最后将worker启动标识返回return workerStarted;
}
addWorker
方法有点长,但是大体可以分为两部分来看,根据代码中我标注的横线注释可以分为
- 双重循环通过CAS操作增加线程数(注意是线程数,不是线程!)
- 把并发安全的部分添加到
workers
里面,并启动任务执行
大体逻辑除了有点长,其实还是很好理解的,不过我在代码中标注的两个序号(1)(2)要稍微注意下。
(1)
下面的判断代码,其实等价于下面的代码
rs >= SHUTDOWN &&(rs != SHUTDOWN ||firstTask != null ||workQueue.isEmpty())
也就意味着在(1)中上面的判断中有三种情况会返回true,也就是addWorker
方法失败,返回false
-
当前线程池状态为
STOP
、TIDYING
或TERMINATED
- 这三个状态都意味着线程池即将关闭,这个时候添加工作线程是不合理的。
- RUNNING(
11100000 00000000 00000000 00000000
)是负数,不满足条件
-
当前线程池状态为
SHUTDOWN
,并且firstTask不为null。- 如果线程池为
SHUTDOWN
状态,这个时候添加有任务的工作线程本身就是不合理的。如果添加一个不携带任务的工作线程,加快SHUTDOWN
转变为TIDYING
倒是合理。
- 如果线程池为
-
当前线程池状态为
SHUTDOWN
,并且任务队列为空- 这就意味着任务队列为空,此时线程池中可能可能还有正在执行任务的线程,当这几个正在执行的线程执行完毕,就会转变为
TIDYING
状态。没有多的任务要做却创建一个不携带任务的工作线程,有点浪费的意思了。
- 这就意味着任务队列为空,此时线程池中可能可能还有正在执行任务的线程,当这几个正在执行的线程执行完毕,就会转变为
以上内容皆为个人愚见,如果错误,请多多指教!
(2)
这个地方的代码是使用CAS操作增加线程数目,我们直到如果众多线程并发CAS修改一个变量,只能有一个变量修改成功同时返回false,其他的线程就都会CAS失败返回false。
- 如果CAS操作成功,也就意味着addWorker中上半部分的代码任务已经执行完毕,直接跳出外层循环。
- 但是如果CAS操作失败,就会判断当前线程池的状态是否改变
- 如果没有改变(当前线程增加线程数目的任务还没有完成)就会继续内层循环继续尝试增加线程数目
- 如果已经改变,那么就会进行下一轮的外层循环,重新获取线程池的状态变量然后重复上述逻辑直到CAS增加线程数目成功
runWorker
从类图我们可以得知,Worker
类实现了Runnable
接口,所以他一定实现了了run()
,而这个run()
方法正是调用runWorker方法。
在前面addWorker
方法中,我们已经看到了当Worker
成功添加进workers
集合后,会调用start()
方法,start
方法会开启一个线程,线程就绪以后会执行run
方法,这部分其实也就是接着上述addWorker
的w.start()
逻辑来的。所以我们接下来会详细讲runWorker
。
首先看一下Worker的构造方法:
Worker(Runnable firstTask) {// 设置state为-1,在调用runWorker方法前禁止中断setState(-1); // 将任务传给Worker的成员变量firstTaskthis.firstTask = firstTask;// 创建一个线程this.thread = getThreadFactory().newThread(this);
}
随后来看看runWorker方法到底做了什么?
final void runWorker(Worker w) {// 获得当前工作线程Thread wt = Thread.currentThread();// 获得Worker中的任务Runnable task = w.firstTask;// 将Worker中的firstTask置为nullw.firstTask = null;// 在我们创建Worker的时候构造方法中将state置为了-1,禁止被中断,这里将state置为0,意味着允许被中断w.unlock(); // 标识线程是否是突然退出,如果在任务执行完毕以前(completedAbruptly为true),当前线程被中断,就会为true标识突然中断boolean completedAbruptly = true;try {// 如果task不为null或者从任务队列获取的任务不为null,则进入循环while (task != null || (task = getTask()) != null) {w.lock();// 如果池停止,请确保线程中断;如果没有,请确保线程未中断。if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行任务前干一些事情(未定义)beforeExecute(wt, task);Throwable thrown = null;try {// 正式执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行任务后做一些事情(未定义)afterExecute(task, thrown);}} finally {// 任务完成后手动释放内存task = null;// 将Work的完成任务计数器+1w.completedTasks++;// 释放锁w.unlock();}}// 标识线程是否是突然退出completedAbruptly = false;} finally {// 执行清理工作// 第一个参数是工作线程对象,第二参数标识线程是否是被突然中断processWorkerExit(w, completedAbruptly);}
}
整体的代码不算长,逻辑也比较清晰,重要的是最后的processWorkerExit
方法需要看一下。
要注意的是runWorker
方法中有一个while
循环,这个循环只有在任务队列为空时才会停止,停止之后才会调用processWorkerExit
方法清除多余的线程(除核心线程外的线程)。
同时这里提一下ThreadPoolExecutor
是如何缓存复用线程的:
ThreadPoolExecutor
中维护了一个线程池的核心线程数和最大线程数,只要任务队列中还有未执行的任务,线程池就会维护至少核心线程数的线程在空闲状态,以便及时处理新来的任务。当任务执行完毕后,执行该任务的工作线程就会从
BlockingQueue
中获取下一个任务,如果队列中还有未执行的任务,则工作线程就会继续执行该任务,否则工作线程就会将自己从线程池的工作线程列表workers
中移除。
processWorkerExit
这个方法主要是为了清除多余的空闲线程。比如,我们上面已经说到的,while循环结束,也就是任务队列为空,那么除了核心线程之外的线程已经没有任务处理了,所以要清除掉。
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果是被突然中断了,则不调整线程池中线程数目if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();// 统计整个线程池完成的任务个数,并从工作集中删除当前Workerfinal ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;// 工作线程执行完任务后,及时将线程从线程池中移除,释放资源,避免浪费系统资源。workers.remove(w);} finally {mainLock.unlock();}// 如果当前是SHUTDOWN状态并且工作队列为空,或者当前是STOP状态,当前线程池中没有活动线程。那么尝试设置线程池状态为TERMINATEDtryTerminate();// 如果当前线程个数小于核心线程个数,则增加线程int c = ctl.get();// c < STOP : RUNNINGif (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}
shutdown
调用shutdown方法以后,线程池就不会接受新的任务了,但是工作队列里面的任务还是要执行的。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 权限检查,查看当前线程是否有关闭/中断线程的权限,如果没有则抛出SecurityException或NullPointerExceptioncheckShutdownAccess();// 设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回advanceRunState(SHUTDOWN);// 设置中断标志interruptIdleWorkers();onShutdown(); } finally {mainLock.unlock();}// 尝试将线程池状态修改为TERMINATEDtryTerminate();}
其中前两个方法不太重要,我们看一下interruptIdleWorkers方法。
interruptIdleWorkers
该方法的主要目的就是将所有空闲线程的中断标识设置为true。
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 遍历workers中的所有workerfor (Worker w : workers) {Thread t = w.thread;// 如果当前worker的线程没有被设置为中断,并且没有正在运行则将其设置为中断。// 简单来说就是将所有空闲线程的中断标志设置为trueif (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}for (Worker w : workers) {Thread t = w.thread;// 如果当前worker的线程没有被设置为中断,并且没有正在运行则将其设置为中断。// 简单来说就是将所有空闲线程的中断标志设置为trueif (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}