> 文章列表 > JUC并发编程之ReentrantLock

JUC并发编程之ReentrantLock

JUC并发编程之ReentrantLock

JUC并发编程之ReentrantLock

1. 非公平锁实现原理

加锁解锁流程

构造器默认实现的是非公平锁

    public ReentrantLock() {sync = new NonfairSync();}

NonfairSync 继承 Sync, Sync 继承 AbstractQueuedSynchronizer

没有竞争时

JUC并发编程之ReentrantLock

第一个竞争出现时

JUC并发编程之ReentrantLock

Thread-1 执行了

  1. CAS 尝试将state 由 0 改为 1,结果失败
  2. 进入 tryAcquire的逻辑,这时state已经是1,结果仍然失败
  3. 接下来进入 addWaiter 逻辑,构造Node队列
    • 图中黄色三角形表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    • Node 的创建是懒惰的
    • 其中第一个 Node 称为 Dummy(哑元)或者哨兵,用来占位,并不关联线程

JUC并发编程之ReentrantLock

当前线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获取锁,失败后进入park阻塞

  2. 如果自己是紧邻着 head(排第二位),那么再次tryAcquire 尝试获取锁,当然这时state仍为 1,失败

  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node, 即 head 的 waitStatus 改为 -1,这次返回false

JUC并发编程之ReentrantLock

  1. shouldParkAfterFailedAcquire 执行完毕后回到 acquireQueued,再次 tryAcquire尝试获取锁,这时state 仍为1,失败

  2. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 WaitStatus 已经是 -1,这次返回的是 true

  3. 进入 parkAndCheckInterrupt,Thread-1 park(灰色)

JUC并发编程之ReentrantLock

再次有多个线程经历上述过程竞争失败,变成这个样子

JUC并发编程之ReentrantLock

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 setExclusiveOwnerThread 为 null
  • state = 0

JUC并发编程之ReentrantLock

当前队列不为 null,并且head的 waiteStatus = -1,进入 unparkSuccessor 逻辑

  • 找到队列中离 head 最近的一个Node(没取消的),unpark 恢复其运行,即Thread-1
  • 回到 Thread-1 的 acquireQueued 流程

JUC并发编程之ReentrantLock

如果加锁成功(没有竞争),会设置

  • 设置 exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1所在的Node,该Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其他线程来竞争(非公平锁)

JUC并发编程之ReentrantLock

如果又被 Thread-4 尝试加锁成功

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

加锁源码

    // Sync 继承自 AQSstatic final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/* Performs lock.  Try immediate barge, backing up to normal* acquire on failure.*/// 加锁实现final void lock() {// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else// 如果尝试失败,进入 ㈠acquire(1);}// tryAcquireprotected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}// (一)从AQS继承过来的方法public final void acquire(int arg) {// tryAcquireif (!tryAcquire(arg) &&// 当 tryAcquire 返回 false时,调用 addWaiter (四),接着acquireQueued(五)acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}// 从Sync 继承过来的方法final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 如果没有获if (c == 0) {// 尝试用 cas 获得,这里体现了非公平性:不检查AQS队列if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁,线程还是当前线程,表示发生了锁重入else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}// 获取失败,回到调用处return false;}// (四) AQS 继承过来的方法private Node addWaiter(Node mode) {// 将当前线程关联到一个Node 对象上,模式为独占模式Node node = new Node(Thread.currentThread(), mode);// 如果 tail 不等于 null, cas尝试将 Node 对象加入 AQS 队列尾部Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {// 双向链表pred.next = node;return node;}}// 尝试将 Node 加入 AQS,进入(六)enq(node);return node;}// (六) AQS 继承过来的方法private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // 还没有,设置head为哨兵节点(不对应线程,状态为0)if (compareAndSetHead(new Node()))tail = head;} else {// cas 尝试将 Node 对象加入 AQS 队列尾部node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}// (五) AQS继承过来的方法final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 上一个节点是 head, 表示轮到自己了(当前线程对应的node),尝试获取if (p == head && tryAcquire(arg)) {// 获取成功,设置自己(当前线程对应的 node) 为headsetHead(node);// 上一个节点 help gcp.next = null; failed = false;// 返回中断标记 falsereturn interrupted;}if (// 判断是否应当park,进入(七)shouldParkAfterFailedAcquire(p, node) &&// park等待,此时 Node 的状态被置为 Node.SIGNAL   SIGNAL = -1(八)  parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}// (七) AQS继承过来的方法private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取上一个节点的状态(上面图中黄色三角形的值)int ws = pred.waitStatus;if (ws == Node.SIGNAL)// 如果上一个节点都在阻塞,那么自己也阻塞return true;// > 0 表示取消状态if (ws > 0) {// 上一个节点取消,那么重构删除前面所有取消的节点,返回到外层循环重试do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 这次还没有阻塞// 但下次如果重试不成功,则需要阻塞,这时需要设置上一个节点状态为 Node。SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}	// (八)阻塞当前线程private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}	
  • 是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的 waitStatus 决定

解锁源码

    // 解锁实现public void unlock() {sync.release(1);}// AQS 继承过来的方法public final boolean release(int arg) {// 尝试释放锁,进入(一)if (tryRelease(arg)) {// 队列头结点 unparkNode h = head;if (// 队列不为 nullh != null && // waitStatus == Node.SIGNAL 才需要 unparkh.waitStatus != 0)// unpark AQS中等待的线程 进入 (二)unparkSuccessor(h);return true;}return false;}// (一)Sync 继承过来的方法 protected final boolean tryRelease(int releases) {// state --int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入,只有 state 减为0,才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}// (二) AQS继承过来的方法private void unparkSuccessor(Node node) {// 如果状态为 Node.SIGNAL 尝试重置状态为 0 // 不成功也可以int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 找到需要 unpark 的节点,但本节点从 AQS 队列中脱离,是由唤醒节点完成的Node s = node.next;// 不考虑已取消的节点,从 AQS 队列从后至前找到队列最前面需要 unpark 的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

2. 可重入原理

        protected final boolean tryRelease(int releases) {// state --int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入,只有state减为0的时候才会释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 支持锁重入,当 state不为0,即已经获得了锁,线程还是当前线程,表示发生了锁重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

3. 可打断原理

不可打断模式

在此模式下,即使它被打断,仍会驻留在AQS队列中,一直等到获得锁后方能得知自己被打断了

    private final boolean parkAndCheckInterrupt() {// 如果打断标记已经是true,则park会失效LockSupport.park(this);// interrupted 会清除打断标记return Thread.interrupted();}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;// 获得锁后,才能返回打断状态return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 如果是因为 interrupt 被唤醒, 返回打断状态为 trueinterrupted = true;}} finally {if (failed)cancelAcquire(node);}}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 如果打断状态为 trueselfInterrupt();}static void selfInterrupt() {// 重新产生一次中断Thread.currentThread().interrupt();}

可打断模式

    public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果没有获得锁,进入(一)if (!tryAcquire(arg))doAcquireInterruptibly(arg);}// (一)可打断的获取锁流程private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 在 park 过程中如果被 interrupt 会进入此// 这时候抛出异常,而不会在进入 for(;;;)throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

4. 公平锁实现原理

    static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {//(一)acquire(1);}// (二) 与非公平锁的区别就在tryAcquire 方法实现的不同protected final boolean tryAcquire (int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (// (三)先检查AQS队列中是否有前驱节点,没有才会去竞争!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}// (一)从AQS 继承的public final void acquire(int arg) {if (// (二)!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){selfInterrupt();}}// (三)public final boolean hasQueuedPredecessors() {Node t = tail; Node h = head;Node s;// h != t 时 表示队列中有 Nodereturn h != t &&// (s = h.next) == null 表示队列中还有没有别的线程// s.thread != Thread.currentThread() 队列中还有别的线程但不是此线程((s = h.next) == null || s.thread != Thread.currentThread());}

5. 条件变量实现原理

每个条件变量其实对应一个等待队列,其实现类是 ConditionObject

await 流程

开始 Thread - 0 持有锁,调用 await,进入ConditionObject 的 addConditionWaiter 流程

创建新的 Node状态为 -2 (Node.CONDITION),关联 Thread-0,加入等待队列尾部

JUC并发编程之ReentrantLock

接下来进入 AQS 的 fullyRelease 流程, 释放同步器上的锁

JUC并发编程之ReentrantLock

unpark AQS队列中的下一个节点, 竞争锁,假设没有其他竞争线程,那么Thread-1 竞争成功

JUC并发编程之ReentrantLock

park 阻塞 Thread-0

JUC并发编程之ReentrantLock

signal 流程

假设 Thread-1 要来唤醒 Thread-0

JUC并发编程之ReentrantLock

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0所在 Node

JUC并发编程之ReentrantLock

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0, Thread-3 的waitStatus 改为 -1

JUC并发编程之ReentrantLock

Thread-1释放锁,进入 unlock 流程

源码

    public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;// 第一个等待节点 private transient Node firstWaiter;// 最后一个等待节点private transient Node lastWaiter;public ConditionObject() { }// (一) 添加一个 Node 至等待队列private Node addConditionWaiter() {Node t = lastWaiter;// 所有已取消的 Node 从队列链表删除 if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 创建一个关联当前线程新的Node,添加至队列尾部Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}//(二)删除已取消的Nodeprivate void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}}// 唤醒 - 将没取消的第一个节点转移至 AQS 队列private void doSignal(Node first) {do {// 已经是尾节点if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (// 将等待队列中的 Node 转移至 AQS 队列,不成功且还有节点则继续循环(三)!transferForSignal(first) &&(first = firstWaiter) != null);}// (三) 外部方法,方便阅读,放在这// 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功final boolean transferForSignal(Node node) {// 如果状态已经不是 Node.CONDITION,说明被取消了if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 加入 AQS 队列尾部Node p = enq(node);int ws = p.waitStatus;if (// 上一个节点被取消ws > 0 || // 上一个节点不能设置状态为 Node.SIGNAL!compareAndSetWaitStatus(p, ws, Node.SIGNAL)){// unpark 取消阻塞,让线程重新同步状态LockSupport.unpark(node.thread);}return true;}// 全部唤醒 - 等待队列的所有节点转移至 AQS 队列private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}// 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}// 全部唤醒- 必须持有锁才能唤醒, 因此 doSignalAll内无需考虑锁public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}// 不可打断等待 - 直到被唤醒public final void awaitUninterruptibly() {// 添加一个 Node 至等待队列 (一)Node node = addConditionWaiter();// 释放节点所持有的锁 (四)int savedState = fullyRelease(node);boolean interrupted = false;// 如果该节点还没有转移至 AQS 队列,阻塞while (!isOnSyncQueue(node)) {// park 阻塞LockSupport.park(this);// 如果被打断,仅设置打断状态if (Thread.interrupted())interrupted = true;}// 唤醒后,尝试竞争锁,如果失败进入 AQS 队列if (acquireQueued(node, savedState) || interrupted)selfInterrupt();}//(四)因为某线程可能重入,需要将 state 全部释放final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}// 打断模式 - 在退出等待时重新设置打断状态private static final int REINTERRUPT = 1;// 打断模式 - 在退出等待时抛出异常private static final int THROW_IE = -1;// 判断打断模式private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}// (五) 应用打断模式private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}// 等待 - 直到被唤醒或打断public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 添加一个 Node 至等待队列, 见 ㈠Node node = addConditionWaiter();// 释放节点持有的锁int savedState = fullyRelease(node);int interruptMode = 0;// 如果该节点还没有转移至 AQS 队列, 阻塞while (!isOnSyncQueue(node)) {// 阻塞LockSupport.park(this);// 如果被打断,推出等待队列if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 退出等待队列,还需要获得 AQS 队列的锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 所有已取消的 Node 从队列链表删除 (二)if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 应用打断模式 (五)if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}// 等待 - 直到被唤醒或打断或超时public final long awaitNanos(long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 添加一个 Node 至等待队列, 见 ㈠Node node = addConditionWaiter();// 释放节点持有的锁  (四)int savedState = fullyRelease(node);// 获得最后期限final long deadline = System.nanoTime() + nanosTimeout;int interruptMode = 0;// 如果该节点还没有转移至 AQSwhile (!isOnSyncQueue(node)) {// 已超时,推出等待队列if (nanosTimeout <= 0L) {transferAfterCancelledWait(node);break;}// park 阻塞一定时间, spinForTimeoutThreshold 为 1000 nsif (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);// 如果被打断, 退出等待队列if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;nanosTimeout = deadline - System.nanoTime();}// 退出等待队列后, 还需要获得 AQS 队列的锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 所有已取消的 Node 从队列链表删除, 见(二)if (node.nextWaiter != null)unlinkCancelledWaiters();// 应用打断模式, 见 (五)if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return deadline - System.nanoTime();}// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanospublic final boolean awaitUntil(Date deadline)throws InterruptedException {}// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanospublic final boolean await(long time, TimeUnit unit)throws InterruptedException {}// 略}