【并发编程】线程池的原理和源码分析
线程使用上可能的问题
我们一般通过new Thread().start();
来创建和运行一个线程,如果在业务过程中有大量场景需要使用多线程来并发,那么就会有以下问题
- 需要频繁的创建和销毁线程 ,需要消耗CPU资源
- 如果创建和销毁的线程的数量过多(大于CPU核数),那么线程之间需要不断的进行上下文切换,会消耗CPU资源
所以我们需要对线程做一个复用
池化技术的核心: 复用
eg. 连接池、对象池、内存池、线程池 。
线程池的思想
提前创建一系列的线程,保存在这个线程池中。
有任务要执行的时候,从线程池中取出线程来执行。
没有任务的时候,将线程放回到线程池中去。
Java中提供的线程池
Executors
- newFixedThreadPool 创建固定线程数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
- newSingleThreadExecutor 只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
- newCachedThreadPool 可以缓存的线程池 ->理论上来说,有多少请求,该线程池就可以创建多少个线程来处理。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
- newScheduledThreadPool 提供了可以按照周期执行的线程池. ->Timer
ThreadPoolExecutor
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}
线程池的基本参数
public ThreadPoolExecutor(int corePoolSize, //核心线程数int maximumPoolSize, //最大线程数long keepAliveTime, //存活时间TimeUnit unit, //存活单位BlockingQueue<Runnable> workQueue, //阻塞队列ThreadFactory threadFactory, //线程工厂,用来创建工作线程的。 默认实现DefaultThreadFactory,默认创建的都是非守护线程(会在newThread将守护线程也改为非守护线程),如果我们想自定义线程池中线程的名字,可以自己传参RejectedExecutionHandler handler) { //拒绝执行策略 。默认实现-报错if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
守护线程区别于用户线程,用户线程即我们手动创建的线程,而守护线程是程序运行的时候在后台提供一种通用服务的线程
守护线程和非守护线程的区别在于当主线程销毁停止时,守护线程会一起销毁。
垃圾回收线程就是典型的守护线程。守护线程并不属于程序中不可或缺的部分。我们可以理解守护线程的存在就是为了维护用户线程,当所有的非守护线程(用户线程) 结束时,守护线程也就没有存在的必要了,所以程序会终止,同时会杀死进程中的所有守护线程。
源码分析
ctl 设计
ctl用来记录线程池的状态和当前线程池的线程数,高3位用来记录线程池状态,低29为用来记录线程数;通过位运算,分别可以获取到线程池的状态和当前线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;// == 29private static final int CAPACITY = (1 << COUNT_BITS) - 1;//初始值为 CAPACITY 000 1 1111 1111 1111 1111 1111 1111 1111// runState is stored in the high-order bits, 所以这里的status都会左移29位,就是为了存储在高位private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
execute
线程池中的核心线程是延迟初始化的。
- 如果当前工作线程数小于核心线程数,则先初始化核心线程并执行任务。
- 否则将task放入阻塞队列进去。(offer() -> true/false)
- 如果true ,说明当前的请求量不大(等待的线程不多), 核心线程就可以搞定。
- false,增加工作线程(非核心线程)
- 如果添加失败,说明当前的工作线程数量达到了最大线程数,直接调用拒绝策略。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();//c一个标记,可以获取线程数和当前标志位int c = ctl.get();//判断当前工作线程数是否小于核心线程数(延迟初始化)if (workerCountOf(c) < corePoolSize) {//小于核心线程数则再创建一个新的工作线程,并将commend作为新线程的第一个执行任务if (addWorker(command, true))return;c = ctl.get();}//执行到这里说明,工作线程>=核心线程数或者前面addWork失败//判断当前线程池是否处于Running状态并添加任务到任务队列,由于isRunning和offer非原子,所以后面还要在check一次状态if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再次check ctl状态,非Running状态则从任务队列移除任务-->执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);//如果当前工作线程数为0,则创建一个新的工作线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果队列已满不能添加,那么再创建一个工作线程(扩容的线程,非核心线程)else if (!addWorker(command, false))reject(command);}
addWork
addWorker 主要干两个事情:
1.通过CAS增加线程数,也就是更新ctl (自旋)
2.初始化新的工作线程并启动
3.启动失败则回滚
private boolean addWorker(Runnable firstTask, boolean core) {//这里的retry是一个标志位,可以认为是给retry下一行的循环起的一个名字//若直接使用break那跳出的就是本次循环,使用break retry就是跳出retry标识下的循环retry://自旋,增加线程数量, 通过CAS来保证增加线程操作在并发下的安全性for (;;) {//(1)int c = ctl.get();//获取线程池的当前状态int rs = runStateOf(c);// 如果线程池处于>=SHUTDOWN状态(SHUTDOWN,STOP,TIDYING或者TERMINATED)状态,如果线程池正处于SHUTDOWN状态,则不再接收新任务,但是会继续处理队列中的任务不会马上关闭// case1: firstTask != null 表示要添加新的任务 -> 失败 不能再增加新任务// case2: firstTask == null且workQueue为空 表示队列中的任务都已经处理完毕了 -> 返回false -> addWorkerif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//自旋for (;;) {int wc = workerCountOf(c);//如果当前线程已经超过核心线程数/最大线程数则返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//通过CAS增加线程数量,成功则跳出整个循环,执行(2)之后的代码if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl//判断运行状态是否发生了改变,如果变化了,则重新执行循环if (runStateOf(c) != rs)//结束本次循环,从(1)开始重新执行循环continue retry;// else CAS failed due to workerCount change; retry inner loop}}//(2)boolean workerStarted = false;boolean workerAdded = false;ThreadPoolExecutor.Worker w = null;try {//Worker extends AbstractQueuedSynchronizer implements Runnable, 创建新的工作线程w = new ThreadPoolExecutor.Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//加锁,保证后续操作的线程安全性mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());//如果线程池是否处于可以正常处理任务的状态 1)运行状态 2)没有新任务的SHUTDOWN状态(即SHUTDOWN以后还在消费工作队列里的任务)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果在启动前,新线程就处于alive状态,抛出异常if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();//当前线程数大于最大线程数,则更新if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//调用线程的start方法,启动线程,执行worker.run()t.start();workerStarted = true;}}} finally {//线程启动失败则回滚, 从work容器移除启动失败的线程并且ctl需要-1if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
runWoker
final void runWorker(ThreadPoolExecutor.Worker w) {Thread wt = Thread.currentThread();//获取当前线程要执行的第一个任务(一般是触发新增这个工作线程的任务)Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//当task不为空,或者可以从任务队列里获取到非空任务,就会一直执行(while循环保证当前线程不结束,直到task为null)while (task != null || (task = getTask()) != null) {//加锁,防止线程池SHUNDOWN,导致正在运行中的任务中断。如果其他地方要shutdown().必须要等我执行完成才可以// Worker继承了AQS -> 实现了互斥锁w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 如果线程池状态为 STOP 及之上的状态,中断线程任务if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//空实现beforeExecute(wt, task);Throwable thrown = null;try {//调用task的run方法task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//空实现afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 根据completedAbruptly决定当前线程执行结束的时候是否需要补充新的WorkerprocessWorkerExit(w, completedAbruptly);}}
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();//获取当前线程池的状态int rs = runStateOf(c);//如果线程池已经处于STOP状态或者处于SHUTDOWN状态且任务队列已空,直接返回null. 并且更新ctl的workCount(-1)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();//这里返回null,Worker线程的run方法也就执行结束了return null;}//获取当前工作线程数int wc = workerCountOf(c);//如果超过时间没有获取到任务,是否允许回收当前线程// 1)allowCoreThreadTimeOut 为true// 2)当前的工作线程数量大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//同时满足以下两种情形,可以销毁当前线程//condition 1.当前线程数大于最大线程数或者在可以超时控制的情形下从阻塞队列里获取任务发生了超时//condition 2.当前线程数>1 或者 任务队列为空if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//通过CAS减少工作线程数 -1if (compareAndDecrementWorkerCount(c))//销毁当前线程return null;continue;}try {//如果任务队列里没有任务,当前工作线程会阻塞在这里Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//执行到这里表示超过keepAliveTime时间,没有获取到新任务timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {//如果当前线程是由于意外突然结束的,那么需要更新ctl的workCountif (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();//当前线程池处于STOP之前的状态:RUNNING/SHUTDOWNif (runStateLessThan(c, STOP)) {//正常退出则判断当前是否需要补充一个新的线程if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}//非正常退出则补充一个新的非核心线程的工作线程addWorker(null, false);}}
线程池数量
对于IO密集型的应用,因为CPU的利用率不高,
所以我们的线程数可以设置的大一点,为CPU的N倍(具体根据业务代码调试,N>=5都可以)
对于CPU密集型的应用,由于CPU的利用率已经很高了,
设置过大的线程数会导致 频繁进行上下文切换,消耗CPU,此时线程数不建议设置的太高
(N为2~3应该就足够了)