> 文章列表 > CompletableFuture解析

CompletableFuture解析

CompletableFuture解析

1、CompletableFuture介绍

CompletableFuture对象是JDK1.8版本新引入的类,这个类实现了两个接口,一个是Future接口,一个是CompletionStage接口。

CompletionStage接口是JDK1.8版本提供的接口,用于异步执行中的阶段处理,其大量用在Lambda表达式计算过程中,CompletionStage定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等等,一般来说要执行下一个阶段都需要上一个阶段正常完成,当然这个类也提供了对异常结果的处理接口。

2、CompletableFuture的API

2.1 提交任务

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

CompletableFuture解析
这四个方法都是用来提交任务的,不同的是supplyAsync提交的任务有返回值,runAsync提交的任务没有返回值。

两个接口都有一个重载的方法,第二个入参为指定的线程池,如果不指定,则默认使用ForkJoinPool.commonPool()线程池。

2.2 结果转换

2.2.1 thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

CompletableFuture解析
thenApply这一函数入参是Function,意思是将上一个CompletableFuture执行结果作为入参,再次进行转换或者计算,重新返回一个新的值。

2.2.2 handle

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

CompletableFuture解析
handle这一组函数入参是BiFunction,该函数式接口有两个入参一个返回值,意思是处理上一个CompletableFuture的处理结果,同时如果有异常,需要手动处理异常。

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

CompletableFuture解析

thenRun这一组函数入参是Runnable函数式接口,该接口无需入参和出参,这一组函数是在上一个CompletableFuture任务执行完成后,在执行另外一个接口,不需要上一个任务的结果,也不需要返回值,只需要在上一个任务执行完成后执行即可。

2.2.3 thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

CompletableFuture解析
thenAccept这一组函数的入参是Consumer,该函数式接口有一个入参,没有返回值,所以这一组接口的意思是处理上一个CompletableFuture的处理结果,但是不返回结果。

2.2.4 thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

CompletableFuture解析

thenAcceptBoth这一组函数入参包括CompletionStage以及BiConsumer,CompletionStage是JDK1.8新增的接口,在JDK中只有一个实现类:CompletableFuture,所以第一个入参就是CompletableFuture,这一组函数是用来接受两个CompletableFuture的返回值,并将其组合到一起。BiConsumer这个函数式接口有两个入参,并且没有返回值,BiConsumer的第一个入参就是调用方CompletableFuture的执行结果,第二个入参就是thenAcceptBoth接口入参的CompletableFuture的执行结果。所以这一组函数意思是将两个CompletableFuture执行结果合并到一起。

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

2.2.5 thenCombine

CompletableFuture解析
thenCombine这一组函数和thenAcceptBoth类似,入参都包含一个CompletionStage,也就是CompletableFuture对象,意思也是组合两个CompletableFuture的执行结果,不同的是thenCombine的第二个入参为BiFunction,该函数式接口有两个入参,同时有一个返回值。所以与thenAcceptBoth不同的是,thenCombine将两个任务结果合并后会返回一个全新的值作为出参。

下面是thenCombine的的测试代码。

public static void testCombine() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 2021);CompletableFuture<String> result = future.thenCompose(k -> {return CompletableFuture.supplyAsync(() -> {return String.valueOf(k) + "-年";});});// 输出结果:2021-年System.out.println(result.get());}

2.2.6 thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

CompletableFuture解析
thenCompose这一组函数意思是将调用方的执行结果作为Function函数的入参,同时返回一个新的CompletableFuture对象。

2.3 回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

CompletableFuture解析

**whenComplete**方法意思是当上一个CompletableFuture对象任务执行完成后执行该方法。BiConsumer函数式接口有两个入参没有返回值,这两个入参第一个是CompletableFuture任务的执行结果,第二个是异常信息。表示处理上一个任务的结果,如果有异常,则需要手动处理异常,与handle方法的区别在于,handle方法的BiFunction是有返回值的,而BiConsumer是没有返回值的。以上方法都有一个带有Async的方法,带有Async的方法表示是异步执行的,会将该任务放到线程池中执行,同时该方法会有一个重载的方法,最后一个参数为Executor,表示异步执行可以指定线程池执行。为了方便进行控制,最好在使用CompletableFuture时手动指定我们的线程池。

2.4 异常处理

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

CompletableFuture解析
exceptionally是用来处理异常的,当任务抛出异常后,可以通过exceptionally来进行处理,也可以选择使用handle来进行处理,不过两者有些不同,hand是用来处理上一个任务的结果,如果有异常情况,就处理异常。而exceptionally可以放在CompletableFuture处理的最后,作为兜底逻辑来处理未知异常。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

CompletableFuture解析

allOf与anyOf的区别在于,allOf是需要入参中所有的CompletableFuture任务执行完成,才会进行下一步,anyOf是入参中任何一个CompletableFuture任务执行完成都可以执行下一步。

2.5 获取结果

public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent)
public T join()

CompletableFuture解析

get方法一个是不带超时时间的,一个是带有超时时间的。而getNow方法则是立即返回结果,如果还没有结果,则返回默认值,也就是该方法的入参。

3、CompletableFuture原理

join方法同样表示获取结果,但是join与get方法有什么区别呢。

public T join() {Object r;return reportJoin((r = result) == null ? waitingGet(false) : r);
}public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);
}public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {Object r;long nanos = unit.toNanos(timeout);return reportGet((r = result) == null ? timedGet(nanos) : r);
}public T getNow(T valueIfAbsent) {Object r;return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

以上是CompletableFuture类中两个方法的代码,可以看到两个方法几乎一样。区别在于reportJoin/reportGet,waitingGet方法是一致的,只不过参数不一样,我们在看下reportJoin与reportGet方法。

private static <T> T reportGet(Object r)throws InterruptedException, ExecutionException {if (r == null) // by convention below, null means interruptedthrow new InterruptedException();if (r instanceof AltResult) {Throwable x, cause;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;throw new ExecutionException(x);}@SuppressWarnings("unchecked") T t = (T) r;return t;}
private static <T> T reportJoin(Object r) {if (r instanceof AltResult) {Throwable x;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if (x instanceof CompletionException)throw (CompletionException)x;throw new CompletionException(x);}@SuppressWarnings("unchecked") T t = (T) r;return t;}

可以看到这两个方法很相近,reportGet方法判断了r对象是否为空,并抛出了中断异常,而reportJoin方法没有判断,同时reportJoin抛出的都是运行时异常,所以join方法也是无需手动捕获异常的。

我们在看下waitingGet方法

private Object waitingGet(boolean interruptible) {Signaller q = null;boolean queued = false;int spins = -1;Object r;while ((r = result) == null) {if (spins < 0)spins = SPINS;else if (spins > 0) {if (ThreadLocalRandom.nextSecondarySeed() >= 0)--spins;}else if (q == null)q = new Signaller(interruptible, 0L, 0L);else if (!queued)queued = tryPushStack(q);else if (interruptible && q.interruptControl < 0) {q.thread = null;cleanStack();return null;}else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}}}if (q != null) {q.thread = null;if (q.interruptControl < 0) {if (interruptible)r = null; // report interruptionelseThread.currentThread().interrupt();}}postComplete();return r;}

该方法是通过while的方式循环判断是否任务已经完成并产生结果,这里需要注意的是

CompletableFuture解析
在这里初始化了一下spins=-1,当第一次进入while循环的时候,spins是-1,这时会将spins赋值为一个常量,该常量为

CompletableFuture解析
这里判断可用CPU数是否大于1,如果大于1,则该常量为 1<< 8,也就是256,否则该常量为0。

第二次进入while循环的时候,spins是256大于0,这里做了减一的操作,下次进入while循环,如果还没有结果,依然是大于0继续做减一的操作,此处用来做短时间的自旋,只有当spins等于0,后续会进入正常流程判断。

我们在看下timedGet方法的源码

private Object timedGet(long nanos) throws TimeoutException {if (Thread.interrupted())return null;if (nanos <= 0L)throw new TimeoutException();long d = System.nanoTime() + nanos;Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0boolean queued = false;Object r;// We intentionally don't spin here (as waitingGet does) because// the call to nanoTime() above acts much like a spin.while ((r = result) == null) {if (!queued)queued = tryPushStack(q);else if (q.interruptControl < 0 || q.nanos <= 0L) {q.thread = null;cleanStack();if (q.interruptControl < 0)return null;throw new TimeoutException();}else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}}}if (q.interruptControl < 0)r = null;q.thread = null;postComplete();return r;}

我们可以看到timedGet方法中没有自旋的那部分代码,源码注释中也说明了原因。
CompletableFuture解析

4、CompletableFuture实现多线程任务

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {// 自定义线程池ExecutorService executorService = new MdcThreadPoolExecutor(5, 10,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100));// 集合保存future对象List<CompletableFuture<Boolean>> futures = new ArrayList<>();for (int i = 0; i < 10; i++) {int finalI = i;CompletableFuture<Boolean> future = CompletableFuture// 提交任务到指定线程池.supplyAsync(() -> {System.out.println("第" + finalI + "个任务");return Boolean.TRUE;}, executorService)// 任务执行异常时处理异常.exceptionally(e -> {System.out.println("任务执行异常!" + e.getMessage());e.printStackTrace();return Boolean.FALSE;});// future对象添加到集合中futures.add(future);}// 等待所有任务执行完成,此处最好加超时时间CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);System.out.println("任务全部执行完成!");}