> 文章列表 > 【并发编程】Condition源码分析

【并发编程】Condition源码分析

【并发编程】Condition源码分析

Condition源码分析

await()

  1. 释放锁
  2. 释放锁的线程,应该被阻塞。
  3. 被阻塞之后该线程要添加到等待队列中。
  4. 被唤醒后,该线程要重新去竞争锁。->AQS的逻辑
  5. 要能够处理interupt()的中断响应。
 public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//添加到等待队列Node node = addConditionWaiter();//完整的释放锁(考虑重入问题)int savedState = fullyRelease(node);int interruptMode = 0;//判断当前线程节点是否在同步队列中,如果不在同步队列中,则说明这个线程还不具备抢锁的资格,则阻塞它(阻塞后先看下signal的实现)while (!isOnSyncQueue(node)) {LockSupport.park(this); //阻塞当前线程(当其他线程调用signal()方法时,该线程会从这个位置去执行)//要判断当前被阻塞的线程是否是因为interrupt()唤醒if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}//重新竞争锁,savedState表示的是被释放的锁的重入次数.if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
    //因为这个代码是在获取锁之后执行的,所以不需要考虑线程安全的问题,单向链表(从尾部插入)private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
final int fullyRelease(AbstractQueuedSynchronizer.Node node) {boolean failed = true;try {//获取到节点状态,也就是持有锁的数量int savedState = getState();//然后释放锁,注意这个会唤醒处于AQS队列中的线程,告诉他们可以抢锁啦if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = AbstractQueuedSynchronizer.Node.CANCELLED;}}
// 判断当前线程节点是否在同步队列中(这里的Node是构建的CONDITION状态的节点,一开始构建的时候其next==null)
final boolean isOnSyncQueue(Node node) {// 如果等待状态是CONDITION,或者前一个指针为空,说明还没有移到AQS的队列中,返回falseif (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果next指针有值,说明已经移到AQS的队列中了if (node.next != null) // If has successor, it must be on queuereturn true;// 到这里说明node.waitStatus不为Node.CONDITION且node.prev不为null// 从AQS的尾节点开始往前寻找看是否可以找到当前节点,找到了也说明已经在AQS的队列中了return findNodeFromTail(node);
}

signal()

  1. 要把被阻塞的线程,先唤醒(signal、signalAll)
  2. 把等待队列中被唤醒的线程转移到AQS队列中
    public final void signal() {// 如果不是当前线程占有着锁,调用这个方法抛出异常,说明signal()也要在获取锁之后执行if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter; //得到当前的等待队列的头节点(单向链表)if (first != null)doSignal(first);}//唤醒等待队列中的一个线程private void doSignal(Node first) {//从头节点开始遍历等待队列,仅唤醒第一个符合条件的线程do {//如果first.nextWaiter为null,说明已到队列尾部,将lastWaiter设置为nullif ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;//将first与其后继节点断开,相当于把头节点从队列中出队first.nextWaiter = null;//转移节点到AQS队列中// 条件1:只要一个线程转移到AQS同步队列成功,transferForSignal返回true,就会终止该循环(如果移动失败则会找下一个节点)// 条件2:用于判断是否遍历到了队列尾部,到了队尾也会终止循环} while (!transferForSignal(first) &&(first = firstWaiter) != null);}
// 将节点从等待队列移动到AQS同步队列,返回移动是否成功
final boolean transferForSignal(Node node) {// 把节点的状态更改为0,也就说准备移动到AQS队列,如果CAS失败,说明该节点已经为取消状态了if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 调用AQS的入队方法把节点移到AQS的队列中,这里enq()的返回值是node的上一个节点Node p = enq(node);// node的前驱节点的状态int ws = p.waitStatus;//如果前驱节点为取消状态,或者更新状态为SIGNAL失败,就直接唤醒当前节点对应的线程if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))// node.thread会从await()里的park处被唤醒,因为已经在同步队列中了,会结束while循环执行AQS.acquireQueued方法尝试获取锁LockSupport.unpark(node.thread);// 如果更新上一个节点的状态为SIGNAL成功,则退出循环,也就是只将一个节点移动到AQS队列,此时当前节点还是阻塞状态return true;

Q: 为什么只要符合这个条件ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL),就要执行 LockSupport.unpark(node.thread);来直接将线程节点唤醒呢?
A: 因为执行到这个条件时,当前的node节点已经在AQS的同步队列中了,然后出现这种情况,就说明node在同步队列中的前一个结点可能被取消了(其waitStatus状态已经不是SIGNAL,而是 CANCELLED了)。
此时如果依然保持当前节点阻塞状态,因为它的前驱节点不是SIGNAL,就可能导致当前node节点无法正常被唤醒的情况。
所以这里安全起见,就先将当前node节点唤醒,调用acquireQueued(node, savedState)去抢锁,如果抢不到锁当前节点也还是会进入AQS同步队列进行阻塞

再次回到await()方法

 public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);//判断等待过程中是否被中断过if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();//如果interruptMode不为0,则有发生中断事件if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
  private int checkInterruptWhileWaiting(Node node) {// 通过Thread.interrupted()来判断该线程是不是被中断过,如果没有被中断则返回0//如果被中断了,则通过transferAfterCancelledWait(node)来判断该线程是在其他线程调用signal()前被中断,还是调用signal()后被中断//如果是在其他线程调用signal前被中断则返回THROW_IE表return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}final boolean transferAfterCancelledWait(AbstractQueuedSynchronizer.Node node) {//如果node.waitStatus的值是Node.CONDITION,则说明是在signal之前被中断的 举例说明(1)if (compareAndSetWaitStatus(node, AbstractQueuedSynchronizer.Node.CONDITION, 0)) {//被中断后就需要重新进入同步队列抢占锁,从头再来enq(node);return true;}/ If we lost out to a signal(), then we can't proceed* until it finishes its enq().  Cancelling during an* incomplete transfer is both rare and transient, so just* spin.*///执行到这里说明其他线程调用了signal(),当前线程不是CONDITION状态,需要自旋等待入列完成while (!isOnSyncQueue(node))//主动让出当前线程的CPU时间片Thread.yield();return false;}private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}

(1)该条件为false的情形:
假设有两个线程threadA、threadB,threadA持有锁后调用await()被park,
threadB获取到锁后调用signal唤醒threadA,A会执行到transferForSignal的第一个if处,将node.waitStatus修改为0,所以这里CAS会失败

电工网