> 文章列表 > Java线程池使用与原理解析2(自定义线程池、合适的线程数量、线程池阻塞队列、线程拒绝策略)

Java线程池使用与原理解析2(自定义线程池、合适的线程数量、线程池阻塞队列、线程拒绝策略)

Java线程池使用与原理解析2(自定义线程池、合适的线程数量、线程池阻塞队列、线程拒绝策略)

在上篇我们学习了线程池各个参数的含义,线程池任务处理流程,使用线程池的好处等内容,本篇我们学习如何创建一个适合我们业务的线程池。为此,我们有必要先学习一下如何大概确定我们线程池核心线程数、怎么设置阻塞队列的类型与大小、当线程池没有能力处理任务了该如何使用拒绝策略等内容。

合适的线程数量

对于线程池来说,不同的任务类型可能采取不同的线程数量会取得更好的效果。这是因为有的线程任务对CPU消耗大,任务时间却短,有的任务需要访问网络,需要较长时间才能完成任务。如果只是笼统的定义一个固定大小的线程池,往往会出现CPU不繁忙,但线程池的线程已满,不能再接受任务了,此时却有可能有一些不耗时但耗费CPU的任务没机会执行,从而导致CPU资源浪费等情况。因此,我们需要根据业务特点来定义线程池。

对于CPU 密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。假设设置过多线程,首先会耗费更多资源,大量任务占用线程,但CPU资源有限,这就导致了很多线程上下文切换的成本,此时性能可能不升反降。

对耗时IO任务,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。这种情况应该设置更多的线程来完成任务,当在等待IO的线程挂起后,此时这些线程并不占用CPU资源,但占用了线程资源,如果没有更多的线程,后续的任务没机会执行,CPU资源也白白浪费。此时如果有更多线程,则这些线程可以去执行其它任务,把CPU资源给利用起来,提高系统的总体性能。

《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法:

线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)

可见,线程数和平均等待时间成正比,和平均工作时间成反比。总体按这个思路来设置线程池线程大小是可行的,实际还可以根据特定的服务器资源进行压测,根据压测结果来调整以达到更高的性能。

线程池阻塞队列

我们知道使用Executors工具类可以方便的创建线程池(不推荐),其内部实际就是使用ThreadPoolExecutor进行创建,只是提供了一些默认参数,不需要开发者去关注而已。这里不展开Executors给我们提供的那几种线程池,这里关注一下这些线程池里面的阻塞队列类型,为我们自定义线程池作参考。

FixedThreadPool
LinkedBlockingQueue
SingleThreadExecutor
LinkedBlockingQueue
CachedThreadPool
SynchronousQueue
SingleThreadScheduledExecutor
DelayedWorkQueue
ScheduledThreadPoolExecutor
DelayedWorkQueue

首先是FixedThreadPool和SingleThreadExecutor,它们用的阻塞队列都是LinkedBlockingQueue,容量是Integer.MAX_VALUE。因为它们的线程数都是固定的,不能创建非核心线程,因此,队列几乎无限大,核心线程都忙的情况只能把任务放队列,这也是固定线程数的关键。

然后是CachedThreadPool,看名字是一个可缓存的线程池。它的阻塞队列使用的是SynchronousQueue,这个阻塞队列本身不存储数据,它只起到任务中转的作用,当有任务被线程put进来后,必须需要另一线程take,前面的线程才会返回。CachedThreadPool利用这个队列特性来无限创建线程。我们看一下CachedThreadPool创建时传入的参数就可窥一二。

 /*** Creates a thread pool that creates new threads as needed, but* will reuse previously constructed threads when they are* available.  These pools will typically improve the performance* of programs that execute many short-lived asynchronous tasks.* Calls to {@code execute} will reuse previously constructed* threads if available. If no existing thread is available, a new* thread will be created and added to the pool. Threads that have* not been used for sixty seconds are terminated and removed from* the cache. Thus, a pool that remains idle for long enough will* not consume any resources. Note that pools with similar* properties but different details (for example, timeout parameters)* may be created using {@link ThreadPoolExecutor} constructors.** @return the newly created thread pool*/public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

注意到核心线程数传了0,即没有核心线程数,最大线程数为Integer.MAX_VALUE,通常系统都不会有这么多线程,可以认为是可以无限创建线程了。线程空闲60秒后会被回收,因此这是可缓存线程,缓存时间60秒。队列则使用SynchronousQueue作中转,因为核心线程数为0,因此根据线程池内部的运转流程,任务一来,如果当前没可用线程,必定直接放队列,然后队列的任务被线程池take出来创建新线程以执行任务。

最后是SingleThreadScheduledExecutor和ScheduledThreadPoolExecutor,这两个线程池可以定时执行任务,其阻塞队列用了DelayedWorkQueue,看这名称我们即隐约知道为啥它能定时执行任务了。DelayedWorkQueue 的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。延时队列执行内部的原理大致是按时间排序好任务,通过锁的await方法和signal方法来控制延时执行,有兴趣可以翻阅ScheduledThreadPoolExecutor源码

private final ReentrantLock lock = new ReentrantLock();/*** Condition signalled when a newer task becomes available at the* head of the queue or a new thread may need to become leader.*/
private final Condition available = lock.newCondition();public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();size = i + 1;if (i == 0) {queue[0] = e;setIndex(e, 0);} else {siftUp(i, e);}if (queue[0] == e) {leader = null;available.signal();}} finally {lock.unlock();}return true;}

线程拒绝策略

这个相对简单,正所谓水满则溢。线程池也有无能为力的情况,当队列、最大线程数都满后,再有任务过来,又无空闲线程可执行任务,此时线程池必须拒绝任务,拒绝任务的做法可以有很多种,直接丟了任务不处理(通常不会用),丢弃最旧的任务,由提交任务的线程自行执行等。下面我们看看JUC包默认的拒绝策略都有哪些。

我们看ThreadPoolExecutor的构造函数可知,需要执行拒绝策略,实现RejectedExecutionHandler接口即可。 

可以看出,RejectedExecutionHandler默认有四个实现

  • DiscardOldestPolicy
  • AbortPolicy
  • CallerRunsPolicy
  • DiscardPolicy

DiscardOldestPolicy策略会将队列中最旧的一个任务丢弃,即直接将队头的任务移除即可

    /*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}

注意Java阻塞队列的实现都是入队添加到队尾,从队头出队,所以上面的代码就是直接poll队头任务。

AbortPolicy实际就是直接拒绝执行任务,丢了个异常出来就不管了。

    /*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}

CallerRunsPolicy这个策略在线程池没能力执行任务时,要求提交任务的线程自行执行任务,这个策略也是相对安全的一种策略,建议使用。

    /*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}

DiscardPolicy策略将直接把任务丢弃,连异常都不抛,这种策略就比较粗暴,不建议使用。

    /*** A handler for rejected tasks that silently discards the* rejected task.*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}

事实上上面四种策略,只有CallerRunsPolicy是推荐使用的,其它三种都有可能造成业务异常,数据丢失等问题,不建议使用。不管是丢弃任务还是抛异常,或是直接不管,都不是我们希望看到的。我们也可以自行实现这个接口,根据实际情况处理。