> 文章列表 > 为什么不推荐通过Executors直接创建线程池

为什么不推荐通过Executors直接创建线程池

为什么不推荐通过Executors直接创建线程池

最近看了一些文章,这里归纳总结一下,仅供参考。

       阿里发布的 Java开发手册中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式。这是为什么?

原因:为什么不推荐通过Executors直接创建线程池呢?

我们参考阿里巴巴的Java开发手册内容:

  • 8.   【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
  • 说明:Executors各个方法的弊端:
  • 1)    newFixedThreadPool和newSingleThreadExecutor:  主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
  • 2)    newCachedThreadPool和newScheduledThreadPool:  主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
  • 9. 【强制】创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。

有4个原因,前2个是主要原因。具体如下:

一、缓存队列 LinkedBlockingQueue 没有设置固定容量大小

1.1、Executors.newFixedThreadPool()

创建固定大小的线程池

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

ThreadPoolExecutor 部分参数:

  • corePoolSize :线程池中核心线程数的最大值。此处为 nThreads个。
  • maximumPoolSize :线程池中能拥有最多线程数 。此处为 nThreads 个。
  • LinkedBlockingQueue 用于缓存任务的阻塞队列 。 此处没有设置容量大小,默认是 Integer.MAX_VALUE,可以认为是无界的。

问题分析:

从源码中可以看出, 虽然表面上 newFixedThreadPool() 中定义了 核心线程数 和 最大线程数 都是固定 nThreads 个,但是当 线程数量超过 nThreads 时,多余的线程会保存到 LinkedBlockingQueue 中,而 LinkedBlockingQueue 没是无界的,导致其无限增大,最终内存撑爆。

1.2、Executors.newSingleThreadExecutor()

创建单个线程池 ,线程池中只有一个线程。

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

创建单个线程池 ,线程池中只有一个线程。

  • 优点: 创建一个单线程的线程池,保证线程的顺序执行 ;
  • 缺点: 与 newFixedThreadPool() 相同。

总结:

newFixedThreadPool()、newSingleThreadExecutor() 底层代码 中 LinkedBlockingQueue 没有设置容量大小,默认是 Integer.MAX_VALUE, 可以认为是无界的。线程池中 多余的线程会被缓存到 LinkedBlockingQueue中,最终内存撑爆。

二 、最大线程数量是 Integer.MAX_VALUE

2.1、Executors.newCachedThreadPool()

缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

ThreadPoolExecutor 部分参数:

  • corePoolSize :线程池中核心线程数的最大值。此处为 0 个。
  • maximumPoolSize :线程池中能拥有最多线程数 。此处为 Integer.MAX_VALUE 。可以认为是无限大 。

优缺点:

  1. 优点: 很灵活,弹性的线程池线程管理,用多少线程给多大的线程池,不用后及时回收,用则新建 ;
  2. 缺点: 从源码中可以看出,SynchronousQueue() 只能存一个队列,可以认为所有 放到 newCachedThreadPool() 中的线程,不会缓存到队列中,而是直接运行的, 由于最大线程数是 Integer.MAX_VALUE ,这个数量级可以认为是无限大了, 随着执行线程数量的增多 和 线程没有及时结束,最终会将内存撑爆。

2.2、Executors.newScheduledThreadPool()

创建固定大小的线程,可以延迟或定时的执行任务

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}// ScheduledThreadPoolExecutor 类的源码:
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);
}
  • 优点: 创建一个固定大小线程池,可以定时或周期性的执行任务 ;
  • 缺点: 与 newCachedThreadPool() 相同。

总结:

newCachedThreadPool()、newScheduledThreadPool() 的 底层代码 中 的 最大线程数(maximumPoolSize) 是 Integer.MAX_VALUE,可以认为是无限大,如果线程池中,执行中的线程没有及时结束,并且不断地有线程加入并执行,最终会将内存撑爆。

三、拒绝策略不能自定义(这个不是重点)

它们统一缺点:不支持自定义拒绝策略。

Executors 底层其实是使用的 ThreadPoolExecutor 的方式 创建的,但是使用的是 ThreadPoolExecutor 的默认策略,即 AbortPolicy。

//默认策略private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();//构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

四、创建线程 或 线程池时请指定有意义的线程名称,方便出错时回溯(这个不是重点)

这个第四条原因参考了: https://blog.csdn.net/w605283073/article/details/80259493
 

五、推荐创建方式

//ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();
ThreadFactory threadFactory = new CustomizableThreadFactory("thread-call-runner-");int size = services.size();
ExecutorService executorService = new ThreadPoolExecutor(size,size,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(512),namedThreadFactory);

六、创建 ThreadFactory 设置线程名称

第一种 CustomizableThreadFactory

Spring 框架提供的 CustomizableThreadFactory。

ThreadFactory springThreadFactory = new CustomizableThreadFactory("springThread-pool-");ExecutorService exec = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10),springThreadFactory);
exec.submit(() -> {logger.info("--记忆中的颜色是什么颜色---");
});

第二种 ThreadFactoryBuilder

Google guava 工具类 提供的 ThreadFactoryBuilder ,使用链式方法创建。

ThreadFactory guavaThreadFactory = new ThreadFactoryBuilder().setNameFormat("retryClient-pool-").build();ExecutorService exec = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10),guavaThreadFactory );
exec.submit(() -> {logger.info("--记忆中的颜色是什么颜色---");
});

第三种 BasicThreadFactory

Apache commons-lang3 提供的 BasicThreadFactory.

ThreadFactory basicThreadFactory = new BasicThreadFactory.Builder().namingPattern("basicThreadFactory-").build();ExecutorService exec = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10),basicThreadFactory );
exec.submit(() -> {logger.info("--记忆中的颜色是什么颜色---");
});

最终本质都是 给 java.lang.Thread#name 设置名称,详情源码感兴趣的可以自行查看。

final Thread thread = new Thread();
thread.setName(name);
 

七、ExecutorService 源码

7.1 创建线程

看看源码吧,从newFixedThreadPool开始。

ExecutorService newFixedThreadPool(int nThreads):固定大小线程池。

      可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的

public static ExecutorService newFixedThreadPool(int nThreads) {  

        return new ThreadPoolExecutor(nThreads, nThreads,  

                                      0L, TimeUnit.MILLISECONDS,  

                                      new LinkedBlockingQueue<Runnable>());  

    }  

ExecutorService newSingleThreadExecutor():单线程。

      可以看到,与fixedThreadPool很像,只不过fixedThreadPool中的入参直接退化为1

public static ExecutorService newSingleThreadExecutor() {  

        return new FinalizableDelegatedExecutorService  

            (new ThreadPoolExecutor(1, 1,  

                                    0L, TimeUnit.MILLISECONDS,  

                                    new LinkedBlockingQueue<Runnable>()));  

    }  

ExecutorService newCachedThreadPool():无界线程池,可以进行自动线程回收。

       这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中,每个插入操作必须等待另一个

       线程的对应移除操作。比如,我先添加一个元素,接下来如果继续想尝试添加则会阻塞,直到另一个线程取走一个元素,反之亦然。(想到什么?就是缓冲区为1的生产者消费者模式^_^)

注意到介绍中的自动回收线程的特性吗,为什么呢?先不说,但注意到该实现中corePoolSize和maximumPoolSize的大小不同。

public static ExecutorService newCachedThreadPool() {  

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  

                                      60L, TimeUnit.SECONDS,  

                                      new SynchronousQueue<Runnable>());  

    }  

===============================神奇分割线==================================

7.2 分析

       到此如果有很多疑问,那是必然了(除非你也很了解了)

       先从BlockingQueue<Runnable> workQueue这个入参开始说起。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。以下为引用:(我会稍微修改一下,并用红色突出显示)

       所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

  • 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行)
  • 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程
  • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

先不着急举例子,因为首先需要知道queue上的三种类型。

7.2 策略

排队有三种通用策略:

  1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  2. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。  

===============================神奇分割线==================================

      到这里,该了解的理论已经够多了,可以调节的就是corePoolSize和maximumPoolSizes 这对参数还有就是BlockingQueue的选择。

例子一:使用直接提交策略,也即SynchronousQueue。

      首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。

       我们使用一下参数构造ThreadPoolExecutor:

new ThreadPoolExecutor(  

            2, 3, 30, TimeUnit.SECONDS,   

            new <span style="white-space: normal;">SynchronousQueue</span><Runnable>(),   

            new RecorderThreadFactory("CookieRecorderPool"),   

            new ThreadPoolExecutor.CallerRunsPolicy());  

 当核心线程已经有2个正在运行.

  1. 此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。”,所以A被添加到queue中。
  2. 又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。
  3. 此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。
  4. 暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。

        所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。

        什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中

例子二:使用无界队列策略,即LinkedBlockingQueue

这个就拿newFixedThreadPool来说,根据前文提到的规则:

 写道

如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。

 那么当任务继续增加,会发生什么呢?

 写道

如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。

 OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?

 写道

如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

 这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于无界队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,如果任务内存大一些,不一会儿就爆了,呵呵。

可以仔细想想哈。

例子三:有界队列,使用ArrayBlockingQueue。

        这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。

举例来说,请看如下构造方法:

new ThreadPoolExecutor(  

            2, 4, 30, TimeUnit.SECONDS,   

            new ArrayBlockingQueue<Runnable>(2),   

            new RecorderThreadFactory("CookieRecorderPool"),   

            new ThreadPoolExecutor.CallerRunsPolicy());  

假设,所有的任务都永远无法执行完。

        对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queu中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。

总结:

  1. ThreadPoolExecutor的使用还是很有技巧的。
  2. 使用无界queue可能会耗尽系统资源。
  3. 使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小
  4. 线程数自然也有开销,所以需要根据不同应用进行调节。

通常来说对于静态任务可以归为:

  1. 数量大,但是执行时间很短
  2. 数量小,但是执行时间较长
  3. 数量又大执行时间又长
  4. 除了以上特点外,任务间还有些内在关系

八、参考

https://xiaojin21cen.blog.csdn.net/article/details/87269126

Executors Java编程规范插件提示手动创建线程池的解决办法_明明如月学长的博客-CSDN博客

Java线程池中三种方式创建 ThreadFactory 设置线程名称_customizablethreadfactory_阿飞云的博客-CSDN博客

ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别-阿里云开发者社区