【Executors、ThreadPoolExecutor】
Executors
在Java中,创建线程池可以使用java.util.concurrent包中的ExecutorService接口。这个接口可以通过Executors工厂类来创建。Executors提供了许多静态工厂方法来创建不同类型的线程池。例如,Executors.newFixedThreadPool(int nThreads)方法可以创建一个固定大小的线程池,该线程池将重用指定数量的线程。
虽然Executors工厂类提供了一种方便的方式来创建线程池,但是在某些情况下,它可能不是最佳选择。原因如下:
1、无法控制线程池的内部实现:Executors工厂类创建的线程池实现是预定义的,无法进行自定义调整。2、不支持设置拒绝策略:Executors工厂类创建的线程池默认情况下使用的是抛出RejectedExecutionException的拒绝策略,无法设置其他拒绝策略。3、不支持设置线程工厂:Executors工厂类创建的线程池默认情况下使用的是Executors.DefaultThreadFactory线程工厂类,无法设置其他线程工厂类。
因此,在某些情况下,使用Executors工厂类创建线程池可能不是最佳选择。在这种情况下,可以手动创建线程池,以便可以控制线程池的内部实现、拒绝策略和线程工厂类。
Executors工厂类提供了一种方便的方式来创建线程池实例。可以使用以下方法之一来创建线程池:
newFixedThreadPool(int nThreads): 创建一个固定大小的线程池,该线程池包含指定数量的线程。
ExecutorService executor = Executors.newFixedThreadPool(5);
newCachedThreadPool(): 创建一个可缓存的线程池,该线程池会根据需要自动创建新的线程。
ExecutorService executor = Executors.newCachedThreadPool();
newSingleThreadExecutor(): 创建一个单线程的线程池。
ExecutorService executor = Executors.newSingleThreadExecutor();
这些方法返回一个ExecutorService对象,它表示线程池实例。可以使用该对象提交任务给线程池来执行。例如:
executor.submit(new Runnable() {public void run() {// 任务代码}
});
需要注意的是,创建的线程池应该在使用完毕后被关闭,可以调用shutdown()方法来关闭线程池:
executor.shutdown();
手动创建线程池
其中线程池包括一个核心线程池大小为10、最大线程池大小为20、线程空闲时间为60秒、使用阻塞队列作为任务队列、自定义的线程工厂类和自定义的拒绝策略:
package com.lfsun.main.basic.mythreadpool;import com.lfsun.common.util.ExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.*;/* 手动创建线程池*/
@Slf4j
public class MyManualThreadPool {public static final Logger logger = LoggerFactory.getLogger(MyManualThreadPool.class);public static void main(String[] args) {// 核心线程池大小int corePoolSize = 10;// 最大线程池大小int maxPoolSize = 20;// 空闲线程存活时间long keepAliveTime = 60L;// 时间单位TimeUnit timeUnit = TimeUnit.SECONDS;// 任务队列,使用有界队列可以避免内存溢出BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);// 线程工厂,可以设置线程名称等属性ThreadFactory threadFactory = new MyThreadFactory();// 自定义拒绝策略,避免任务丢失RejectedExecutionHandler handler = new MyRejectedExecutionHandler();// 创建线程池ExecutorService executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, queue, threadFactory, handler);// 提交任务for (int i = 1; i <= 100; i++) {executor.submit(new MyTask(i));}ExecutorUtil.shutdown(executor);}/* 线程工厂,用于创建线程时设置线程名称等属性*/private static class MyThreadFactory implements ThreadFactory {private static int counter = 1;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "我的线程-" + counter++);}}/* 任务类,实现Runnable接口*/private static class MyTask implements Runnable {private final int taskId;public MyTask(int taskId) {this.taskId = taskId;}@Overridepublic void run() {System.out.println("任务 -" + taskId + " 运行,线程 " + Thread.currentThread().getName());// 执行任务逻辑,避免使用Thread.sleep()阻塞线程try {// 模拟任务执行耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务 -" + taskId + " 完成,线程 " + Thread.currentThread().getName());}}/* 自定义拒绝策略,避免任务丢失*/private static class MyRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("任务 " + r.toString() + " 被拒绝,执行器 " + executor.toString());// 自定义处理逻辑,例如重新提交任务或记录日志// executor.submit(r);logger.warn("任务被拒绝:{}", r);}}
}
在这个示例代码中,MyThreadFactory和MyRejectedExecutionHandler是自定义的线程工厂类和拒绝策略。MyTask是自定义的任务类,每个任务都会打印出自己的任务ID,并在执行完成后打印出自己的任务ID和执行该任务的线程名称。
该示例创建了一个ThreadPoolExecutor实例,并向其提交100个MyTask任务。当所有任务都完成后,通过调用shutdown()方法来关闭线程池。
package com.lfsun.common.util;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;/
* 线程池工具类* @author Administrator*/
public class ExecutorUtil {/* 关闭线程池,等待已提交的任务执行完毕,不再接受新的任务提交* shutdown()方法是ThreadPoolExecutor类中用来关闭线程池的方法。调用该方法会将线程池设置为不再接受新的任务,但会等待所有已经提交的任务执行完成。* 在所有任务完成后,线程池会自动关闭。 具体来说,shutdown()方法会做以下几件事情: 停止接受新的任务:将线程池的isShutdown属性设置为true,表示线程池已经关闭,不再接受新的任务。* 尝试停止所有活动的线程:会尝试中断所有正在执行任务的线程。* 等待所有任务执行完成:会等待所有已经提交的任务执行完成,并关闭线程池。* 注意:如果在调用shutdown()方法后又向线程池提交新的任务,这些任务将会被拒绝并抛出RejectedExecutionException异常。* 如果需要继续向线程池提交任务,可以使用shutdownNow()方法来强制关闭线程池,但这会导致所有未完成的任务被取消并且可能会导致数据丢失。* @param executor 线程池对象*/public static void shutdown(ExecutorService executor) {executor.shutdown();}/* 关闭线程池,尝试终止正在执行的任务,不再接受新的任务提交* executor.shutdown() 方法将不再接受新任务,并等待之前提交的任务完成执行。* 在这个方法返回之前,所有的线程都将执行完当前正在执行的任务,并且所有的空闲线程将会停止。* 调用此方法后,可以调用awaitTermination(long timeout, TimeUnit unit)等待所有任务执行完成。* <p>* executor.shutdownNow() 方法将尝试停止当前正在执行的任务,并立即停止所有空闲线程。* 这个方法不会等待所有任务完成执行。它会尝试中断线程池中所有的线程,并返回尚未开始执行的任务的列表。* <p>* 需要注意的是,executor.shutdownNow() 并不能保证所有的任务都能被终止,因为一些任务可能会忽略中断请求而继续执行。 @param executor 线程池对象* @return*/public static List<Runnable> shutdownNow(ExecutorService executor) {// 返回尚未执行的任务(即在调用 shutdownNow() 方法时被取消的任务)。// 每个元素都是一个实现了 Runnable 接口的对象,表示一个尚未执行的任务。// 如果 shutdownNow() 方法被调用时,没有任务被取消,则返回空列表。return executor.shutdownNow();}/* 等待线程池中已提交的任务在指定的时间内执行完毕 用于等待所有通过Executor提交的任务完成执行,并且在超时时间内等待所有任务完成。 具体而言,该方法会阻塞当前线程,直到以下两种情况之一发生:* * 所有通过executor提交的任务都已经完成执行。* * 等待时间已经超过timeout指定的时长。* @param executor 线程池对象* @param timeout 最长等待时间* @param unit 时间单位* @return 如果所有任务都已完成,则返回 true;否则为 false* @throws InterruptedException 如果等待过程中被中断,则抛出 InterruptedException 异常*/public static boolean awaitTermination(ExecutorService executor, long timeout, TimeUnit unit) throws InterruptedException {// 如果在等待时间内所有任务都完成,则返回true;否则返回false。// 如果该方法返回false,则不代表所有任务都没有完成,仅代表在等待时间内没有等待所有任务完成。// 如果需要确保所有任务都完成,可以在该方法返回false之后继续调用shutdownNow()方法来中断未完成的任务。return executor.awaitTermination(timeout, unit);}/* 等待线程池中的所有任务执行完毕,不限制等待时间* 方法使用了 ExecutorService 类的 awaitTermination 方法来实现等待操作。* 该方法将会一直等待直到所有任务执行完毕,或者等待过程中被中断,此时会抛出 InterruptedException 异常。 这个方法的作用是确保在某些情况下,线程池中的任务都能够被正确地执行完毕,而不会因为等待超时或者中断等原因而导致任务未能执行完毕。* @param executor 线程池对象* @throws InterruptedException 如果等待过程中被中断,则抛出 InterruptedException 异常*/public static boolean awaitTerminationForever(ExecutorService executor) throws InterruptedException {// 如果返回 true,表示所有任务已经执行完毕;如果返回 false,则表示等待过程中被中断。return executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}
}