> 文章列表 > Callable和FutureTask可能会创建多次。但是FutureTask能 够在⾼并发环境下确保任务只执⾏⼀次,这是怎么实现的?

Callable和FutureTask可能会创建多次。但是FutureTask能 够在⾼并发环境下确保任务只执⾏⼀次,这是怎么实现的?

Callable和FutureTask可能会创建多次。但是FutureTask能 够在⾼并发环境下确保任务只执⾏⼀次,这是怎么实现的?

先说原理:

FutureTask能够在高并发环境下确保任务只执行一次的原理是通过使用volatile和CAS(Compare and Swap)实现的。

具体来说,FutureTask在执行任务前,会先检查任务状态,如果状态为NEW,则表示任务还未执行,可以执行任务。而执行任务的过程中,会先判断当前状态是否为NEW,如果是,则将状态更新为COMPLETING,并执行任务,最后将任务结果保存到outcome变量中,并将状态更新为NORMAL或EXCEPTIONAL。因此,如果多个线程同时调用FutureTask的get()方法获取任务结果,只有第一个调用get()方法的线程会执行任务,其他线程会被阻塞,直到任务执行完成。

当第一个线程执行任务完成后,它会将任务状态更新为NORMAL或EXCEPTIONAL,并将outcome变量保存任务结果。此时,其他被阻塞的线程会被唤醒,它们会再次检查任务状态,发现任务状态已经不是NEW了,就直接返回保存在outcome变量中的任务结果。因此,如果多个线程同时调用FutureTask的get()方法获取任务结果,只有第一个调用get()方法的线程会执行任务,其他线程会被阻塞,但是它们都可以获取到任务执行结果,而不会重复执行任务。

在实现过程中,为了确保状态更新的原子性,FutureTask使用了CAS操作。在状态更新的过程中,先检查当前状态是否为s,如果是,则将状态更新为ns;否则,说明当前状态已经被其他线程修改过了,不需要再更新了。CAS操作保证了状态的原子性,避免了多个线程同时修改状态造成的并发问题。

此外,FutureTask中还使用了volatile关键字,保证了状态变量的可见性,确保了不同线程之间对状态的操作是同步的。

综上所述,FutureTask能够在高并发环境下确保任务只执行一次的原理是通过使用volatile和CAS实现的。在执行任务之前,会检查任务状态是否为NEW,如果是,则可以执行任务;在执行任务的过程中,会将任务状态更新为COMPLETING,并将任务结果保存到outcome变量中,最后将任务状态更新为NORMAL或EXCEPTIONAL。多个线程同时调用get()方法获取任务结果时,只有第一个调用get()方法的线程会执行任务,其他线程会被阻塞,直到任务执行完成,并且它们都可以获取到任务执行结果,而不会重复执行任务。

以下是FutureTask的关键源码,其中涉及到的关键注释已经写在代码里面了:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.LockSupport;import static java.util.concurrent.CompletableFuture.WAITERS;public class FutureTask<V> implements RunnableFuture<V> {//任务状态private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;//任务结果private Object outcome;//执行任务的Callableprivate Callable<V> callable;//执行任务的线程private volatile Thread runner;//等待任务完成的线程队列private volatile WaitNode waiters;public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;}//获取任务结果public V get() throws InterruptedException, ExecutionException {//如果任务状态为NEW,则表示任务还未执行,需要执行任务if (state == NEW)//执行任务run();//等待任务执行完成,并返回任务结果return reportOutcome();}//执行任务public void run() {//只有在任务状态为NEW时才能执行任务if (state != NEW ||//使用CAS将任务状态从NEW更新为COMPLETING!U.compareAndSwapInt(this, STATE, NEW, COMPLETING))return;try {Callable<V> c = callable;if (c != null && state == COMPLETING) {//执行任务,并将任务结果保存到outcome变量中V result = c.call();//使用CAS将任务状态从COMPLETING更新为NORMAL,并将任务结果保存到outcome变量中if (U.compareAndSwapInt(this, STATE, COMPLETING, NORMAL)) {outcome = result;//唤醒等待任务完成的线程releaseWaiters();}}} catch (Throwable ex) {//使用CAS将任务状态从COMPLETING更新为EXCEPTIONAL,并将异常保存到outcome变量中if (U.compareAndSwapInt(this, STATE, COMPLETING, EXCEPTIONAL)) {outcome = ex;//唤醒等待任务完成的线程releaseWaiters();}}}//等待任务执行完成,并返回任务结果private V reportOutcome() throws InterruptedException, ExecutionException {int s = state;//如果任务状态为NORMAL,则返回任务结果if (s == NORMAL)return (V) outcome;//如果任务状态为EXCEPTIONAL,则抛出异常if (s == EXCEPTIONAL)throw new ExecutionException((Throwable) outcome);//任务状态为COMPLETING,表示任务正在执行中,需要等待任务完成synchronized (this) {while (state == COMPLETING) {//将当前线程加入等待任务完成的线程队列中waiters = new WaitNode();Thread.yield();}}//任务执行完成后,返回任务结果或抛出异常return reportOutcome();}//唤醒等待任务完成的线程private void releaseWaiters() {WaitNode q;//将等待任务完成的线程队列中的所有线程都唤醒,并从队列中移除这些线程while ((q = waiters) != null) {if (U.compareAndSwapObject(this, WAITERS, q, null)) {for (; ; ) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null;q = next;}break;}}}//等待任务完成的线程节点static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() {thread = Thread.currentThread();}}
}

注释已经写得比较详细了,主要实现原理如下:

1. 任务状态通过volatile修饰,保证线程间可见。

2. 在get()方法中,如果任务状态为NEW,则表示任务还未执行,需要执行任务。执行任务时,使用CAS将任务状态从NEW更新为COMPLETING,并执行Callable中的call()方法,将结果保存到outcome变量中。执行完成后,使用CAS将任务状态从COMPLETING更新为NORMAL或EXCEPTIONAL,并唤醒等待任务完成的线程。

3. 在reportOutcome()方法中,如果任务状态为NORMAL,则返回任务结果。如果任务状态为EXCEPTIONAL,则抛出异常。如果任务状态为COMPLETING,表示任务正在执行中,需要等待任务完成。使用synchronized将当前线程加入等待任务完成的线程队列中,然后进入wait()状态等待任务完成。

4. 在run()方法中,只有在任务状态为NEW时才能执行任务。执行任务时,使用CAS将任务状态从NEW更新为COMPLETING,并执行Callable中的call()方法,将结果保存到outcome变量中。执行完成后,使用CAS将任务状态从COMPLETING更新为NORMAL或EXCEPTIONAL,并唤醒等待任务完成的线程。

5. 唤醒等待任务完成的线程时,使用volatile修饰的waiters变量保存等待任务完成的线程队列。使用CAS将waiters变量置为null,并遍历等待任务完成的线程队列,将所有线程都唤醒。唤醒线程时,使用LockSupport.unpark()方法唤醒线程,唤醒后将该节点从等待任务完成的线程队列中移除。

通过上述实现,FutureTask能够在高并发环境下确保任务只执行一次。如果多个线程同时调用get()方法,只有一个线程会执行任务,其他线程会进入等待状态。当任务执行完成后,所有等待任务完成的线程都会被唤醒并返回任务结果或抛出异常。