> 文章列表 > JUC多并发编程 CompletableFuture

JUC多并发编程 CompletableFuture

JUC多并发编程 CompletableFuture

Future 接口理论

Future 接口(FutureTask 实现类): 定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等

方法图:

类图:

代码示例: 

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new MyThread2());Thread t1 = new Thread(futureTask, "t1");t1.start();System.out.println(futureTask.get());}
}// 多线程,无返回
class MyThread implements Runnable{@Overridepublic void run() {}
}// 无返回
class MyThread2 implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("----- come in call()");return "hello Callable";}
}

优点:

Future + 线程池异步多线程任务配合,能显著提高程序的执行效率

import java.util.concurrent.*;public class FutureThreadPoolDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);long startTime = System.currentTimeMillis();FutureTask<String> futureTask1 = new FutureTask<String>(()-> {try{TimeUnit.MICROSECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}return "task1 over";});threadPool.submit(futureTask1);FutureTask<String> futureTask2 = new FutureTask<String>(()-> {try{TimeUnit.MICROSECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}return "task2 over";});threadPool.submit(futureTask2);try{TimeUnit.MICROSECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println(futureTask1.get());System.out.println(futureTask2.get());System.out.println("---costTime" + (endTime - startTime) + "毫秒");System.out.println(Thread.currentThread().getName() + "\\t ----end");threadPool.shutdown();}public static void m1(String[] args) {long startTime = System.currentTimeMillis();try{TimeUnit.MICROSECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}try{TimeUnit.MICROSECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}try{TimeUnit.MICROSECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}long endTime = System.currentTimeMillis();System.out.println("---costTime" + (endTime - startTime) + "毫秒");System.out.println(Thread.currentThread().getName() + "\\t ----end");}
}

缺点:

  • get: 容易导致阻塞,一般建议放在程序后面
  • IsDone 轮询:轮询的方式会耗费无谓的 CPU 资源,而且也不见得能及时地得到计算结果, 如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;public class FutureApiDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask1 = new FutureTask<String>(()-> {try{System.out.println(Thread.currentThread().getName() + "\\t ---- come in");TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}return "task1 over";});Thread t1 = new Thread(futureTask1, "t1");t1.start();//        System.out.println(futureTask1.get());  // 非要等到结果才会离开,不管你是否计算完成,  get 容易导致阻塞,一般建议放在程序后面
//        System.out.println(Thread.currentThread().getName() + "\\t -- 忙其它任务了");while(true) {if (futureTask1.isDone()) {System.out.println(futureTask1.get());break;}else {try{TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("正在处理中...");}}}
}

CompletableFuture

CompletableFure 提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方

类图:

CompletionStage:

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段 
  • 一个阶段的执行可能被单个阶段的完成触发,也可以由多个阶段一起触发

CompletableFuture:

  • 在 java8 中,CompletableFuture 提供了非常强大的 Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法
  • 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作
  • 它实现了 Future 和 CompletionStage 接口

四大常用静态方法:

  • 没有执行 Executor 的方法,直接使用默认的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码
  • 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码
类型 方法名
runAsync 无返回值 public static CompletableFuture<Void> runAsync(Runable runable)
public static CompletableFuture<Void> runAsync(Runable runable, Executor executor)
supplyAsync 有返回值 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

示例代码:

import java.util.concurrent.*;public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);//        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()-> {
//            System.out.println(Thread.currentThread().getName());
//            try{
//                TimeUnit.SECONDS.sleep(1);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }, threadPool);
//        System.out.println(completableFuture.get());CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()-> {System.out.println(Thread.currentThread().getName());try{TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "hello supplyAsync";}, threadPool);System.out.println(completableFuture.get());threadPool.shutdown();}
}

减少阻塞和轮询:

可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

import java.util.concurrent.*;public class CompletableFutureUseDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}if(result > 5) {int i = 10/0;}System.out.println("---- 1 秒钟后出结果:" + result);return result;}, threadPool).whenComplete((v, e) -> {if(e != null) {System.out.println("---- 计算完成,更新系统UpdateValue:" + v);}}).exceptionally(e-> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + " 线程先去忙其他任务");}public static void future2(String[] args) {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("---- 1 秒钟后出结果:" + result);return result;}).whenComplete((v, e) -> {if(e != null) {System.out.println("---- 计算完成,更新系统UpdateValue:" + v);}}).exceptionally(e-> {e.printStackTrace();System.out.println("异常情况:" + e.getCause() + "\\t" + e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + " 线程先去忙其他任务");// ForkJoinPool 避免守护线程关闭try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}public static void future1() throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("---- 1 秒钟后出结果:" + result);return result;});System.out.println(Thread.currentThread().getName() + " 线程先去忙其他任务");System.out.println(completableFuture.get());}
}

CompletableFuture的优点:

  • 异步任务结束时,会自动回调某个对象的方法
  • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象方法

使用案例:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class CompletableFutureMallDemo {static List<NetMall> list = Arrays.asList(new NetMall("jd"), new NetMall("dangdang"), new NetMall("taobao"));public static List<String> getPrice(List<NetMall> list, String productName) {return list.stream().map(it-> String.format(productName + "in %s price is %.2f", it.getNetMallName(), it.calcPrice(productName))).collect(Collectors.toList());}public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {return list.stream().map(it-> CompletableFuture.supplyAsync(()-> String.format(productName + "in %s price is %.2f", it.getNetMallName(), it.calcPrice(productName)))).collect(Collectors.toList()).stream().map(s -> s.join()).collect(Collectors.toList());}public static void main(String[] args) {long startTime = System.currentTimeMillis();List<String> list1 = getPrice(list, "mysql");for (String element : list1) {System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("----costTime:" + (endTime - startTime) + "毫秒");long startTime2 = System.currentTimeMillis();List<String> list2 = getPriceByCompletableFuture(list, "mysql");for (String element : list1) {System.out.println(element);}long endTime2 = System.currentTimeMillis();System.out.println("----costTime:" + (endTime2 - startTime2) + "毫秒");}
}class NetMall{private String netMallName;public String getNetMallName() {return netMallName;}public NetMall(String netMallName) {this.netMallName = netMallName;}public double calcPrice(String productName) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}

常用方法

结果和触发计算:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()-> {System.out.println(Thread.currentThread().getName());try{TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello supplyAsync";});
//        System.out.println(completableFuture.get());
//        System.out.println(completableFuture.get(2, TimeUnit.SECONDS));
//        System.out.println(completableFuture.join());// 如果正常完成,则返回正常值,否则返回设置的默认值
//        System.out.println(completableFuture.getNow("xxx"));// 是否打断 get 方法立即返回括号值System.out.println(completableFuture.complete("completeValue") + "\\t" + completableFuture.join());}
}

计算结果进行处理:

public class CompletableFutureAPI2Demo {public static void main(String[] args) {// 任务 A 执行完执行 B,并且 B 不需要 A的结果System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()-> {}).join());// 任务 A 执行完执行B,B 需要 A的结果,但是任务 B 无返回值System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(r-> System.out.println(r)).join());// 任务 A 执行完执行B, B需要 A的结果,同时任务 B 有返回值System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply(r-> r+ "resultB").join());}// thenAccept 消费处理,无返回结果public static void thenAccept(String[] args) {CompletableFuture.supplyAsync(()-> {return 1;}).thenApply(f-> {return f + 2;}).thenApply(f-> {return f + 3;}).thenAccept((v)->{System.out.println("消费处理:" + v);});}// handler 可以带着异常继续向下走public static void handle() {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture.supplyAsync(()-> {System.out.println(Thread.currentThread().getName());int i = 10/0;try{TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(1111);return 1;},threadPool).handle((f,e)-> {System.out.println(2222);return f + 2;}).handle((f,e)-> {System.out.println(3333);return f + 3;}).whenComplete((v, e)->{if (e == null) {System.out.println("---- 计算结果:" + v);}}).exceptionally(e-> {e.printStackTrace();System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "---- 主线程先去忙其他任务");}// 如果出现异常,无法走下一步public static void thenApply() {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture.supplyAsync(()-> {System.out.println(Thread.currentThread().getName());try{TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(1111);return 1;},threadPool).thenApply(f-> {System.out.println(2222);return f + 2;}).thenApply(f-> {System.out.println(3333);return f + 3;}).whenComplete((v, e)->{if (e == null) {System.out.println("---- 计算结果:" + v);}}).exceptionally(e-> {e.printStackTrace();System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName() + "---- 主线程先去忙其他任务");}
}

CompletableFuture 和 线程池:

  • 没有传入自定义线程池,都是默认线程池 ForkJoinPool
  • 传入了一个自定义线程池,如果第一个执行第一个任务的时候,传入了一个自定义线程池,调用 thenRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。调用 thenRunAsync 执行第二个任务时,则第一个任务使用的你自己传入的线程池,第二个任务使用的是 ForkJoin 线程池
  • 有可能处理太快,系统优化切换原则,直接使用 main 线程处理
import java.util.concurrent.*;public class CompletableFutureWithThreadPoolDemo {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("1 号任务\\t" + Thread.currentThread().getName());try{ TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }return "hello";}, threadPool).thenRun(() -> {try{ TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("2 号任务\\t" + Thread.currentThread().getName());}).thenRunAsync(() -> {try{ TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("3 号任务\\t" + Thread.currentThread().getName());}).thenRun(() -> {try{ TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("4 号任务\\t" + Thread.currentThread().getName());});System.out.println(completableFuture.get(2, TimeUnit.SECONDS));}
}

计算速度选用:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureFastDemo {public static void main(String[] args) {CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try{ TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }return "playA";});CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try{ TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }return "playB";});CompletableFuture<String> result = playA.applyToEither(playB, f -> {return f + " is winer";});System.out.println(Thread.currentThread().getName() + "\\t----" + result.join());}
}

计算结果进行合并:

  • 两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 来处理
  • 先完成的先等着,等待其他分支任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombineDemo {public static void main(String[] args) {CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\\n" + "启动了");try{ TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\\n" + "启动了");try{ TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }return 20;});CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {System.out.println("------ 开始两个结果合并");return x + y;});System.out.println(Thread.currentThread().getName() + "\\t----" + result.join());}
}