> 文章列表 > Springboot结合线程池的使用

Springboot结合线程池的使用

Springboot结合线程池的使用

1.使用配置文件配置线程的参数

配置文件

thread-pool:core-size: 100max-size: 100keep-alive-seconds: 60queue-capacity: 1

配置类

@Component
@ConfigurationProperties("thread-pool")
@Data
public class ThreadPoolConfig {private int coreSize;private int maxSize;private int keepAliveSeconds;private int queueCapacity;
}

2.配置线程池并使用

方式一:线程池结合CompletableFuture来实现

配置线程池类

@Configuration
public class ThreadPoolTask1 {@Autowiredprivate ThreadPoolConfig threadPoolConfig;@Bean("task1")public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadPoolConfig.getCoreSize()); // 核心线程数executor.setMaxPoolSize(threadPoolConfig.getMaxSize()); // 最大线程数executor.setKeepAliveSeconds(threadPoolConfig.getKeepAliveSeconds()); // 非核心线程活跃时间executor.setQueueCapacity(threadPoolConfig.getQueueCapacity()); // 队列容量executor.setThreadNamePrefix("test-"); // 设置线程的前缀名executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 设置拒绝策略executor.setWaitForTasksToCompleteOnShutdown(false); // 是否在任务执行完后关闭线程池executor.initialize();return executor;}
}

CompletableFuture使用线程池进行调用

package com.example.demo;import com.example.demo.config.ThreadPoolTask1;
import com.sun.java.browser.plugin2.DOM;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.stream.Collectors;@RestController
public class Demo {@Autowiredprivate DemoServiceImpl demoService;@Autowired@Qualifier("task1")private ThreadPoolTaskExecutor threadPoolTask1;@GetMapping("thread1")public Map<String, String> thread1() {long start = System.currentTimeMillis();List<CompletableFuture<String>> futures = new ArrayList<>();Map<String, String> map = new HashMap<>();// 使用CompletableFuture的 supplyAsync来处理结果相当于submit, runAsync无返回相当于executefor (int i = 0; i < 100; i++) {int b = i;CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->demoService.executorTask1(b), threadPoolTask1);futures.add(future);}// 获取结果集CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) ->futures.stream().forEach(item -> {String result = null;try {result = item.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}map.put(result.split("-")[1], result.split("-")[0]);})).join();long end = System.currentTimeMillis();map.put("当前时间",(end- start) + "");return map;}
}

任务类

    public String executorTask1(int i) {System.out.println("当前线程-" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}if (i == 6) {throw new RuntimeException("cs");} else {return "aaa-" + i;}}

方式二:使用@EnableAsync和@Async方式实现

在启动类上加@EnableAsync注解

// 加上@EnableAsync注解,也可以在自己的配置类上加
@EnableAsync // 开启对异步任务的支持
@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}
}

编写线程池配置

// 也可以在此处加上@EnableAsync注解,入口类上不加
@Configuration
public class ThradPoolTask{@Autowiredprivate ThreadPoolConfig threadPoolConfig;@Beanpublic Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadPoolConfig.getCoreSize()); // 核心线程数executor.setMaxPoolSize(threadPoolConfig.getMaxSize()); // 最大线程数executor.setKeepAliveSeconds(threadPoolConfig.getKeepAliveSeconds()); // 非核心线程活跃时间executor.setQueueCapacity(threadPoolConfig.getQueueCapacity()); // 队列容量executor.setThreadNamePrefix("test-"); // 设置线程的前缀名executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 设置拒绝策略executor.setWaitForTasksToCompleteOnShutdown(false); // 是否在任务执行完后关闭线程池executor.initialize();return executor;}
}

使用

@RestController
public class Demo {@Autowiredprivate DemoServiceImpl demoService;@GetMapping("thread")public Map<String, String> thread() {long start = System.currentTimeMillis();Map<String, String> map = new HashMap<>();List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 100; i++) {Future<String> future = demoService.executorTask(i);futures.add(future);}int b = 1;futures.forEach(item -> {try {String result = item.get();map.put(result.split("-")[1], result.split("-")[0]);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}});long end = System.currentTimeMillis();map.put("当前时间",(end- start) + "");return map;}
}

任务类

    // 必须指明使用的是哪个线程池,taskExecutor不带的话用springboot默认注册的线程池// Future作为返回值,携带返回结果@Async("taskExecutor")public Future<String> executorTask(int i) {System.out.println("当前线程-" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}if (i == 6) {throw new RuntimeException("cs");} else {// AsyncResult返回携带的结果return new AsyncResult<String>("aaa-" + i);}}

方式三:重写springboot默认的线程池配置

在启动类上加@EnableAsync注解

// 加上@EnableAsync注解,也可以在自己的配置类上加
@EnableAsync
@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}
}
@Configuration
public class ThreadPoolTask2 implements AsyncConfigurer {@Autowiredprivate ThreadPoolConfig threadPoolConfig;/* 修改默认线程池的配置 @return*/@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadPoolConfig.getCoreSize()); // 核心线程数executor.setMaxPoolSize(threadPoolConfig.getMaxSize()); // 最大线程数executor.setKeepAliveSeconds(threadPoolConfig.getKeepAliveSeconds()); // 非核心线程活跃时间executor.setQueueCapacity(threadPoolConfig.getQueueCapacity()); // 队列容量executor.setThreadNamePrefix("test2-"); // 设置线程的前缀名executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 设置拒绝策略executor.setWaitForTasksToCompleteOnShutdown(false); // 是否在任务执行完后关闭线程池executor.initialize();return executor;}/* 修改默认的异常处理* 注意:如果带有返回值Future,异常会被捕获,不会去执行该方法 @return*/@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return (ex, method, params) -> {System.out.println("任务执行的异常" + ex);System.out.println("执行的任务方法" + method.getName());for (Object param : params) {System.out.println("执执行任务的参数" + param);}};}
}

使用


@RestController
public class Demo {@Autowiredprivate DemoServiceImpl demoService;// 不带返回值的任务@GetMapping("thread2")public Map<String, String> thread2() {long start = System.currentTimeMillis();Map<String, String> map = new HashMap<>();List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 100; i++) {demoService.executorTask2(i);}return null;}// 带有返回值的任务@GetMapping("thread3")public Map<String, String> thread3() {long start = System.currentTimeMillis();Map<String, String> map = new HashMap<>();List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 100; i++) {Future<String> future = demoService.executorTask3(i);futures.add(future);}int b = 1;futures.forEach(item -> {String result = null;try {result = item.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}map.put(result.split("-")[1], result.split("-")[0]);});long end = System.currentTimeMillis();map.put("当前时间",(end- start) + "");return map;}
}

任务类

	// @Async不需要指定,使用默认即可// 出现异常会走AsyncUncaughtExceptionHandler方法@Asyncpublic void executorTask2(int i) {System.out.println("当前线程-" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}int a = 1/0;}// 即使出现异常也不会走AsyncUncaughtExceptionHandler方法@Asyncpublic Future<String> executorTask3(int i) {System.out.println("当前线程-" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}if (i == 6) {throw new RuntimeException("cs");} else {return new AsyncResult<String>("aaa-" + i);}}

关于三种在执行过程中的异常

方式一:导致请求失败:最好在任务中进行处理
在这里插入图片描述

方式二:请求成功了,关于6的那条数据并没有返回给前端
在这里插入图片描述

方式三:请求成功了,关于6的那条数据并没有返回给前端