> 文章列表 > SpringBoot线程池和Java线程池的用法和实现原理

SpringBoot线程池和Java线程池的用法和实现原理

SpringBoot线程池和Java线程池的用法和实现原理

文章目录

  • 使用默认的线程
    • 方式一:通过`@Async`注解调用
    • 方式二:直接注入 `ThreadPoolTaskExecutor`
    • 线程池默认配置信息
  • SpringBoot 线程池的实现原理
    • 覆盖默认的线程池
    • 管理多个线程池
  • JAVA常用的四种线程池
    • newCachedThreadPool
    • newFixedThreadPool
    • newScheduledThreadPool
    • newSingleThreadExecutor
  • Java 线程池中的四种拒绝策略
    • CallerRunsPolicy
    • AbortPolicy
    • DiscardPolicy
    • DiscardOldestPolicy
  • java 线程复用的原理

使用默认的线程池

方式一:通过@Async注解调用

public class AsyncTest {@Asyncpublic void async(String name) throws InterruptedException {System.out.println("async" + name + " " + Thread.currentThread().getName());Thread.sleep(1000);}
}

启动类上需要添加@EnableAsync注解,否则不会生效。

@SpringBootApplication
//@EnableAsync
public class Test1Application {public static void main(String[] args) throws InterruptedException {ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);AsyncTest bean = run.getBean(AsyncTest.class);for(int index = 0; index <= 10; ++index){bean.async(String.valueOf(index));}}
}

方式二:直接注入 ThreadPoolTaskExecutor

此时可不加 @EnableAsync注解

@SpringBootTest
class Test1ApplicationTests {@ResourceThreadPoolTaskExecutor threadPoolTaskExecutor;@Testvoid contextLoads() {Runnable runnable = () -> {System.out.println(Thread.currentThread().getName());};for(int index = 0; index <= 10; ++index){threadPoolTaskExecutor.submit(runnable);}}}

线程池默认配置信息

SpringBoot线程池的常见配置:

spring:task:execution:pool:core-size: 8max-size: 16                          # 默认是 Integer.MAX_VALUEkeep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止allow-core-thread-timeout: true       # 是否允许核心线程超时,默认truequeue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUEshutdown:await-termination: false              # 线程关闭等待thread-name-prefix: task-               # 线程名称的前缀

SpringBoot 线程池的实现原理

TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutortaskExecutor

// TaskExecutionAutoConfiguration#applicationTaskExecutor()
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {return builder.build();
}
// ThreadPoolTaskExecutor#initializeExecutor()
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);ThreadPoolExecutor executor;if (this.taskDecorator != null) {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler) {@Overridepublic void execute(Runnable command) {Runnable decorated = taskDecorator.decorate(command);if (decorated != command) {decoratedTaskMap.put(decorated, command);}super.execute(decorated);}};}else {executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,queue, threadFactory, rejectedExecutionHandler);}if (this.allowCoreThreadTimeOut) {executor.allowCoreThreadTimeOut(true);}this.threadPoolExecutor = executor;return executor;
}
// ExecutorConfigurationSupport#initialize()
public void initialize() {if (logger.isInfoEnabled()) {logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));}if (!this.threadNamePrefixSet && this.beanName != null) {setThreadNamePrefix(this.beanName + "-");}this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}

覆盖默认的线程池

覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

@Configuration
public class ThreadPoolConfiguration {@Bean("taskExecutor")public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();//设置线程池参数信息taskExecutor.setCorePoolSize(10);taskExecutor.setMaxPoolSize(50);taskExecutor.setQueueCapacity(200);taskExecutor.setKeepAliveSeconds(60);taskExecutor.setThreadNamePrefix("myExecutor--");taskExecutor.setWaitForTasksToCompleteOnShutdown(true);taskExecutor.setAwaitTerminationSeconds(60);//修改拒绝策略为使用当前线程执行taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//初始化线程池taskExecutor.initialize();return taskExecutor;}
}

管理多个线程池

如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

@Bean("taskExecutor2")
public ThreadPoolTaskExecutor taskExecutor2() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();//设置线程池参数信息taskExecutor.setCorePoolSize(10);taskExecutor.setMaxPoolSize(50);taskExecutor.setQueueCapacity(200);taskExecutor.setKeepAliveSeconds(60);taskExecutor.setThreadNamePrefix("myExecutor2--");taskExecutor.setWaitForTasksToCompleteOnShutdown(true);taskExecutor.setAwaitTerminationSeconds(60);//修改拒绝策略为使用当前线程执行taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//初始化线程池taskExecutor.initialize();return taskExecutor;
}

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

@Resource
ThreadPoolTaskExecutor taskExecutor2;

对于使用@Async注解的多线程则在注解中指定bean的名字即可。

    @Async("taskExecutor2")public void async(String name) throws InterruptedException {System.out.println("async" + name + " " + Thread.currentThread().getName());Thread.sleep(1000);}

线程池的四种拒绝策略

JAVA常用的四种线程池

ThreadPoolExecutor 类的构造函数如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

newCachedThreadPool

不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

newFixedThreadPool

定长线程池,超出线程数的任务会在队列中等待。

return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

newScheduledThreadPool

类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

周期执行:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(()->{System.out.println("rate");
}, 1, 1, TimeUnit.SECONDS);

延时执行:

scheduledThreadPool.schedule(()->{System.out.println("delay 3 seconds");
}, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

单线程线程池,可以实现线程的顺序执行。

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

Java 线程池中的四种拒绝策略

  • CallerRunsPolicy:线程池让调用者去执行。

  • AbortPolicy:如果线程池拒绝了任务,直接报错。

  • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

  • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

CallerRunsPolicy

直接在主线程中执行了run方法。

public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
}

效果类似于:

Runnable thread = ()->{System.out.println(Thread.currentThread().getName());try {Thread.sleep(0);} catch (InterruptedException e) {throw new RuntimeException(e);}
};thread.run();

AbortPolicy

直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}
}

DiscardPolicy

什么也不做。

public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}

DiscardOldestPolicy

  • e.getQueue().poll() : 取出队列最旧的任务。

  • e.execute(r) : 当前任务入队。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
}

java 线程复用的原理

java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{/* This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/ Thread this worker is running in.  Null if factory fails. */final Thread thread;/ Initial task to run.  Possibly null. */Runnable firstTask;/ Per-thread task counter */volatile long completedTasks;/* Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/ Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}

work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}