> 文章列表 > java 线程池

java 线程池

java 线程池

一.简单的线程池设计:

java 线程池
线程池的执行示意图:
java 线程池

二. 线程池的核心参数:

java 线程池

三.线程池的处理流程:

java 线程池

四.线程池的阻塞队列

1.基于数组的有界阻塞队列
2.基于链表的有界阻塞队列
3.基于链表的无界阻塞队列
4.同步移交阻塞队列

1.基于数组的有界阻塞队列:

public static void main(String[] args) throws InterruptedException {// 创建一个有界队列,队列容量为10ArrayBlockingQueue queue = new ArrayBlockingQueue<Integer>(10);// 循环向队列添加元素for (int i = 0; i < 20; i++) {queue.put(i);System.out.println("向队列中添加值:" + i);}}

到了第10个就阻塞不动了:
java 线程池

2.基于链表的有界阻塞队列:

public static void main(String[] args) throws InterruptedException {//基于链表的有界阻塞队列,队列容量为10LinkedBlockingQueue queue = new LinkedBlockingQueue<Integer>(10);// 循环向队列添加元素for (int i = 0; i < 20; i++) {queue.put(i);System.out.println("向队列中添加值:" + i);}}

到了第10个就阻塞不动了:
java 线程池

3.基于链表的无界阻塞队列(上面的2不添加队列容量就是无界):

public static void main(String[] args) throws InterruptedException {//基于链表的无界阻塞队列,队列容量为无限LinkedBlockingQueue queue = new LinkedBlockingQueue<Integer>();// 循环向队列添加元素for (int i = 0; i < 20; i++) {queue.put(i);System.out.println("向队列中添加值:" + i);}}

循环的所有数据都执行完成‘
java 线程池
4.同步移交阻塞队列

public static void main(String[] args) throws InterruptedException {//同步移交阻塞队列SynchronousQueue queue = new SynchronousQueue<Integer>();//插入值new Thread(() -> {try {queue.put(1);System.out.println("删除成功");} catch (InterruptedException e) {e.printStackTrace();}}).start();// 删除值new Thread(() -> {try {queue.take();System.out.println("删除成功");} catch (InterruptedException e) {e.printStackTrace();}}).start();}

上面的插入值或者删除值缺少任何一个,都不会输出下面内容,会进入阻塞状态
java 线程池

五.线程池可选择的饱和策略:

1.AbortPolicy终止策略(默认)
2.DiscardPolicy抛弃策略
3.DiscardOldestPolicy抛弃旧任务策略
4.CallerRunsPolicy调用者运行策略

六.创建线程池的方法:

1.newCachedThreadPooljava 线程池
2.newFixedThreadPool
java 线程池
3.newSingleThreadExecutor
java 线程池

七.向线程池提交任务:

7.1 submit

 @Testpublic void submitTest() throws ExecutionException, InterruptedException {// 创建线程池ExecutorService threadPool =Executors.newCachedThreadPool();//利用submit方法提交任务,接收任务的返回结果Future<Integer> future = threadPool.submit(() -> {Thread.sleep(1000L * 10);return 2 *5;});//阻塞方法,直到任务有返回值后,才向下执行Integer num = future.get() ;System.out.println("执行结果:" + num);}

7.2 execute:

   @Testpublic void executeTest() throws InterruptedException {// 创建线程池ExecutorService threadPool = Executors.newCachedThreadPool();//利用execute方法提交任务,没有返回结果threadPool.execute(() -> {try {Thread.sleep(1000 * 10);} catch (InterruptedException e) {e.printStackTrace();}Integer num = 2 * 5;System.out.println("执行结果:" + num);});Thread.sleep(1000L*1000);}

java 线程池

八.线程池的状态:

java 线程池

九.线程池的饱和策略:

9.1线程池饱和策略之终止策略:

package com.smart.agriculture;import org.junit.After;
import org.junit.Test;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 饱和策略*/
public class PolicyTest {//线程池private static ThreadPoolExecutor executor =new ThreadPoolExecutor(//核心线程数2和最大线程数32, 3,//线程空闲后的存活时间60L, TimeUnit.SECONDS,//有界阻塞队列new LinkedBlockingQueue<Runnable>(5));/*** 要在线程池中执行的任务*/class Task implements Runnable {//任务名称private String taskName;public Task(String taskName) {this.taskName = taskName;}@Overridepublic void run() {System.out.println("线程【" + Thread.currentThread().getName() +"】正在执行【" + this.taskName + "】任务...");try {Thread.sleep(1000 * 5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程【" + Thread.currentThread().getName() +"】已执行完【" + this.taskName + "】任务!!!");}}/*** 线程池的执行过程* 2个核心线程* 5个任务的队列* 3个最大线程:1个线程可用* <p>* 前2个任务,会占用2个核心线程* 第3个到第7个任务,会暂存到任务队列中* 第8个任务,会启动最大线程,去执行* 第9,10个任务,没有线程可以去执行 报红*/@Testpublic void abortPolicyTest() {// 设置饱和策略为终止策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());for (int i = 1; i <= 10; i++) {try {//提交10个线程任务executor.execute(new Task("线程任务" + i));} catch (Exception e) {System.err.println(e);}// 关闭线程池executor.shutdown();}}//单元测试执行完,主线程等待100秒,防止主线程退出*@throws InterruptedException@Afterpublic void after() throws InterruptedException {Thread.sleep(1000 * 100);}
}

结果如下:
java 线程池
9.2 线程池之饱和策略之抛弃策略:

package com.smart.agriculture;import org.junit.After;
import org.junit.Test;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 抛弃策略*/
public class PolicyTest {//线程池private static ThreadPoolExecutor executor =new ThreadPoolExecutor(//核心线程数2和最大线程数32, 3,//线程空闲后的存活时间60L, TimeUnit.SECONDS,//有界阻塞队列new LinkedBlockingQueue<Runnable>(5));/*** 要在线程池中执行的任务*/class Task implements Runnable {//任务名称private String taskName;public Task(String taskName) {this.taskName = taskName;}@Overridepublic void run() {System.out.println("线程【" + Thread.currentThread().getName() +"】正在执行【" + this.taskName + "】任务...");try {Thread.sleep(1000 * 5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程【" + Thread.currentThread().getName() +"】已执行完【" + this.taskName + "】任务!!!");}}/*** 线程池的执行过程* 2个核心线程* 5个任务的队列* 3个最大线程:1个线程可用* <p>* 前2个任务,会占用2个核心线程* 第3个到第7个任务,会暂存到任务队列中* 第8个任务,会启动最大线程,去执行* 第9,10个任务,没有线程可以去执行 不会报红,直接被忽略* */@Testpublic void abortPolicyTest() {//设置饱和策略为 抛弃策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());for (int i = 1; i <= 10; i++) {try {//提交10个线程任务executor.execute(new Task("线程任务" + i));} catch (Exception e) {System.err.println(e);}// 关闭线程池executor.shutdown();}}//单元测试执行完,主线程等待100秒,防止主线程退出*@throws InterruptedException@Afterpublic void after() throws InterruptedException {Thread.sleep(1000 * 100);}
}

java 线程池
9.3 线程池之饱和策略之抛弃旧任务策略:

package com.smart.agriculture;import org.junit.After;
import org.junit.Test;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 抛弃旧任务策略*/
public class PolicyTest {//线程池private static ThreadPoolExecutor executor =new ThreadPoolExecutor(//核心线程数2和最大线程数32, 3,//线程空闲后的存活时间60L, TimeUnit.SECONDS,//有界阻塞队列new LinkedBlockingQueue<Runnable>(5));/*** 要在线程池中执行的任务*/class Task implements Runnable {//任务名称private String taskName;public Task(String taskName) {this.taskName = taskName;}@Overridepublic void run() {System.out.println("线程【" + Thread.currentThread().getName() +"】正在执行【" + this.taskName + "】任务...");try {Thread.sleep(1000 * 5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程【" + Thread.currentThread().getName() +"】已执行完【" + this.taskName + "】任务!!!");}}/*** 线程池的执行过程* 2个核心线程* 5个任务的队列* 3个最大线程:1个线程可用* <p>* 前2个任务,会占用2个核心线程* 第3个到第7个任务,会暂存到任务队列中* 第8个任务,会启动最大线程,去执行* 第9,10个任务,没有线程可以去执行 第9个任务取代第3个任务,第10给任务取代第4个任务暂存在任务队列中,第3,4个任务会丢失,不会被执行*/@Testpublic void abortPolicyTest() {//设置饱和策略为 抛弃旧任务策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());for (int i = 1; i <= 10; i++) {try {//提交10个线程任务executor.execute(new Task("线程任务" + i));} catch (Exception e) {System.err.println(e);}// 关闭线程池executor.shutdown();}}//单元测试执行完,主线程等待100秒,防止主线程退出*@throws InterruptedException@Afterpublic void after() throws InterruptedException {Thread.sleep(1000 * 100);}
}

java 线程池
9.4 线程池之饱和策略之调用者运行策略:

package com.smart.agriculture;import org.junit.After;
import org.junit.Test;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 调用者运行策略*/
public class PolicyTest {//线程池private static ThreadPoolExecutor executor =new ThreadPoolExecutor(//核心线程数2和最大线程数32, 3,//线程空闲后的存活时间60L, TimeUnit.SECONDS,//有界阻塞队列new LinkedBlockingQueue<Runnable>(5));/*** 要在线程池中执行的任务*/class Task implements Runnable {//任务名称private String taskName;public Task(String taskName) {this.taskName = taskName;}@Overridepublic void run() {System.out.println("线程【" + Thread.currentThread().getName() +"】正在执行【" + this.taskName + "】任务...");try {Thread.sleep(1000 * 5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程【" + Thread.currentThread().getName() +"】已执行完【" + this.taskName + "】任务!!!");}}/*** 线程池的执行过程* 2个核心线程* 5个任务的队列* 3个最大线程:1个线程可用* <p>* 前2个任务,会占用2个核心线程* 第3个到第7个任务,会暂存到任务队列中* 第8个任务,会启动最大线程,去执行* 第9没有线程可以去执行 调用者自己的主线程执行第9个任务* 到第10个线程,前2个任务执行完成,所以也开始执行第10个线程*/@Testpublic void abortPolicyTest() {//设置饱和策略为 调用者运行策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 1; i <= 10; i++) {try {//提交10个线程任务executor.execute(new Task("线程任务" + i));} catch (Exception e) {System.err.println(e);}// 关闭线程池executor.shutdown();}}//单元测试执行完,主线程等待100秒,防止主线程退出*@throws InterruptedException@Afterpublic void after() throws InterruptedException {Thread.sleep(1000 * 100);}
}

java 线程池