> 文章列表 > 03 | 线程池:业务代码最常用也最容易犯错的组件

03 | 线程池:业务代码最常用也最容易犯错的组件

03 | 线程池:业务代码最常用也最容易犯错的组件

03 | 线程池:业务代码最常用也最容易犯错的组件

线程池的声明需要手动进行

手动 new ThreadPoolExecutor 来创建线程池
newFixedThreadPool 和 newCachedThreadPool如果使用这两个方法,可能会因为资源耗尽导致OOM 问题。
初始化一个单线程的 FixedThreadPool,循环 1 亿次向线程池提交任务,每个任务都会创建一个比较大的字符串然后休眠一小时:

ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThre
//打印线程池的信息,稍后我会解释这段代码
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString()
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

其实newFixedThreadPool 是创建一个可重用固定线程数的线程池,但是使用共享的无界队列方式来运行这些线程。工作队列使用LinkedBlockingQueue,Integer.MAX_VALUE 长度的队列,可以认为是无界的,如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。
如果改为使用 newCachedThreadPool:1.可根据需要创建新线程的线程池 2.旧的线程可用时将重用他们 3.对短期异步的程序 可提高程序性能
这种线程池的最大线程数是 Integer.MAX_VALUE,是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,要有请求到来,就必须有一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM。
生产事件:用户注册后,我们调用一个外部服务去发送短信,发送短信接口正常时可以在 100 毫秒内响应,TPS 100 的注册量,CachedThreadPool 能稳定在占用 10 个左右线程的情况下满足需求。在某个时间点,外
部短信服务不可用了,我们调用这个服务的超时又特别长, 比如 1 分钟,1 分钟可能就进来了 6000 用户,产生 6000 个发送短信的任务,需要 6000 个线程,没多久就因为无法创建线程导致了 OOM,整个应用程序崩溃。

我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。
任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。

线程池线程管理策略详解

简单的监控

private void printStats(ThreadPoolExecutor threadPool) {Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {log.info("=========================");log.info("Pool Size: {}", threadPool.getPoolSize());log.info("Active Threads: {}", threadPool.getActiveCount());log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCoxxx());log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size())log.info("=========================");}, 0, 1, TimeUnit.SECONDS);
}

线程池的工作方式:

  1. 不会初始化 corePoolSize 个线程,有任务来了才创建工作线程;
  2. 当核心线程满了之后不会立即扩容线程池,而是把任务堆积到工作队列中;
  3. 当工作队列满了后扩容线程池,一直到线程个数达到 maximumPoolSize 为止;
  4. 如果队列已满且达到了最大线程后还有任务进来,按照拒绝策略处理;
  5. 当线程数大于核心线程数时,线程等待 keepAliveTime 后还是没有任务需要处理的话,收缩线程到核心线程数。

很早之前就想过:目前的线程池都是先放满了队列,才会扩容线程池,如果队列非常大,那么最大线程数就没什么意义了。能不能,核心线程数满了,先扩大线程数,最后再放进队列里面。
思路

  1. 由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队列的 offer 方法,造成这个队列已满的假象呢?
  2. 由于我们 Hack 了队列,在达到了最大线程后势必会触发拒绝策略,那么能否实现一个自定义的拒绝策略处理程序,这个时候再把任务真正插入队列呢?

Tomcat就是这样实现的。

务必确认清楚线程池本身是不是复用的

案例:某项目生产环境时不时有报警提示线程数过多,超过2000 个,收到报警后查看监控发现,瞬时线程数比较多但过一会儿又会降下来,线程数抖动很厉害,而应用的访问量变化不大。
我们在线程数比较高的时候进行线程栈抓取,抓取后发现内存中有 1000 多个自定义线程池。一般而言,线程池肯定是复用的,有 5 个以内的线程池都可以认为正常,而 1000 多个线程池肯定不正常。
我们没有搜到声明线程池的地方,搜索 execute 关键字后定位到,原来是业务代码调用了一个类库来获得线程池,类似如下的业务代码:调用 ThreadPoolHelper的 getThreadPool 方法来获得线程池,然后提交数个任务到线程池处理,看不出什么异常
来到 ThreadPoolHelper 的实现让人大跌眼镜,getThreadPool 方法居然是每次都使用 Executors.newCachedThreadPool 来创建一个线程池

class ThreadPoolHelper {public static ThreadPoolExecutor getThreadPool() {// 线程池没有复用return (ThreadPoolExecutor) Executors.newCachedThreadPool();}
}

解决方法:使用一个静态字段来存放线程池的引用,返回线程池的代码直接返回这个静态字段即可。

class ThreadPoolHelper {private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor10, 50,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());public static ThreadPoolExecutor getRightThreadPool() {return threadPoolExecutor;}
}

需要仔细斟酌线程池的混用策略

要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:

  1. 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队列。
  2. 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2

场景:业务代码使用了线程池异步处理一些内存中的数据,但通过监控发现处理得非常慢。而且也不设计io操作。
排查发现业务代码使用的线程池,还被一个后台的文件批处理任务用到了。
这个线程池只有 2 个核心线程,最大线程也是 2,使用了容量为100 的 ArrayBlockingQueue 作为工作队列,使用了 CallerRunsPolicy 拒绝策略
模拟代码:

@PostConstruct
public void init() {printStats(threadPool);new Thread(() -> {//模拟需要写入的大量数据String payload = IntStream.rangeClosed(1, 1_000_000).mapToObj(__ -> "a").collect(Collectors.joining(""));while (true) {threadPool.execute(() -> {try {//每次都是创建并写入相同的数据到相同的文件Files.write(Paths.get("demo.txt"), Collections.singletonLis} catch (IOException e) {e.printStackTrace();}log.info("batch file processing done");});}}).start();
}

线程池的 2 个线程始终处于活跃状态,队列也基本处于打满状态,因为开启了CallerRunsPolicy 拒绝处理策略,所以当线程满载队列也满的情况下,任务会在提交任务的线程,或者说调用 execute 方法的线程执行,也就是说不能认为提交到线程池的任务就一定是异步处理的。如果使用了 CallerRunsPolicy 策略,那么有可能异步任务变为同步执行。
所以直接使用这个线程池进行异步计算的话,当线程池饱和的时候,计算任务会在执行 Web 请求的 Tomcat 线程执行,这时就会进一步影响到其他同步处理的线程,甚至造成整个应用程序崩溃。
解决:自己新建一个线程池,调整线程数,不使用CallerRunsPolicy拒绝策略。

Java 8 的 parallel stream 功能,可以让我们很方便地并行处理集合中的元素,其背后是共享同一个 ForkJoinPool,默认并行度是CPU 核数 -1。
核心线程数不会被回收。