ReentrantLock原理--非公平锁、可重入、可打断性
1.非公平锁实现原理
加锁解锁流程
先从构造器开始看,默认为非公平锁实现
public ReentrantLock() {sync = new NonfairSync();
}
NonfairSync 继承自 AQS
没有竞争时
第一个竞争出现时
Thread-1 执行了
-
CAS 尝试将 state 由 0 改为 1,结果失败
-
进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
-
接下来进入 addWaiter 逻辑,构造 Node 队列
-
图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
-
Node 的创建是懒惰的
-
其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
-
当前线程进入 acquireQueued 逻辑
-
acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
-
如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
-
进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
-
shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
-
当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
-
进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0 释放锁,进入 tryRelease 流程,如果成功
- 设置 exclusiveOwnerThread 为 null
- state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
// Sync 继承自 AQS
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;// 加锁实现final void lock() {// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else// 如果尝试失败,进入 ㈠acquire(1);}// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处public final void acquire(int arg) {// ㈡ tryAcquire if (!tryAcquire(arg) &&// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();}}// ㈡ 进入 ㈢protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}// ㈢ Sync 继承过来的方法, 方便阅读, 放在此处final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 如果还没有获得锁if (c == 0) {// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}// 获取失败, 回到调用处return false;}// ㈣ AQS 继承过来的方法, 方便阅读, 放在此处private Node addWaiter(Node mode) {// 将当前线程关联到一个 Node 对象上, 模式为独占模式Node node = new Node(Thread.currentThread(), mode);// 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {// 双向链表pred.next = node;return node;}}// 尝试将 Node 加入 AQS, 进入 ㈥enq(node);return node;}// ㈥ AQS 继承过来的方法, 方便阅读, 放在此处private Node enq(final Node node) {for (; ; ) {Node t = tail;if (t == null) {// 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)if (compareAndSetHead(new Node())) {tail = head;}} else {// cas 尝试将 Node 对象加入 AQS 队列尾部node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}// ㈤ AQS 继承过来的方法, 方便阅读, 放在此处final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (; ; ) {final Node p = node.predecessor();// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取if (p == head && tryAcquire(arg)) {// 获取成功, 设置自己(当前线程对应的 node)为 headsetHead(node);// 上一个节点 help GCp.next = null;failed = false;// 返回中断标记 falsereturn interrupted;}if (// 判断是否应当 park, 进入 ㈦shouldParkAfterFailedAcquire(p, node) &&// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧parkAndCheckInterrupt()) {interrupted = true;}}} finally {if (failed)cancelAcquire(node);}}// ㈦ AQS 继承过来的方法, 方便阅读, 放在此处private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取上一个节点的状态int ws = pred.waitStatus;if (ws == Node.SIGNAL) {// 上一个节点都在阻塞, 那么自己也阻塞好了return true;}// > 0 表示取消状态if (ws > 0) {// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 这次还没有阻塞// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}// ㈧ 阻塞当前线程private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
}
/*
注意:是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的
waitStatus 决定
*/
解锁源码
// Sync 继承自 AQS
static final class NonfairSync extends Sync {// 解锁实现public void unlock() {sync.release(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final boolean release(int arg) {// 尝试释放锁, 进入 ㈠if (tryRelease(arg)) {// 队列头节点 unparkNode h = head;if (// 队列不为 nullh != null &&// waitStatus == Node.SIGNAL 才需要 unparkh.waitStatus != 0) {// unpark AQS 中等待的线程, 进入 ㈡unparkSuccessor(h);}return true;}return false;}// ㈠ Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryRelease(int releases) {// state--int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入, 只有 state 减为 0, 才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处private void unparkSuccessor(Node node) {// 如果状态为 Node.SIGNAL 尝试重置状态为 0// 不成功也可以int ws = node.waitStatus;if (ws < 0) {compareAndSetWaitStatus(node, ws, 0);}// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的Node s = node.next;// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
}
2.可重入原理
static final class NonfairSync extends Sync {// ...// Sync 继承过来的方法, 方便阅读, 放在此处final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryRelease(int releases) {// state--int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入, 只有 state 减为 0, 才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
}
3.可打断原理
不可打断模式
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
// Sync 继承自 AQS
static final class NonfairSync extends Sync {// ...private final boolean parkAndCheckInterrupt() {// 如果打断标记已经是 true, 则 park 会失效LockSupport.park(this);// interrupted 会清除打断标记return Thread.interrupted();}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (; ; ) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null;failed = false;// 还是需要获得锁后, 才能返回打断状态return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 如果是因为 interrupt 被唤醒, 返回打断状态为 trueinterrupted = true;}}} finally {if (failed)cancelAcquire(node);}}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {// 如果打断状态为 trueselfInterrupt();}}static void selfInterrupt() {// 重新产生一次中断Thread.currentThread().interrupt();}
}
可打断模式
static final class NonfairSync extends Sync {public final void acquireInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果没有获得到锁, 进入 ㈠if (!tryAcquire(arg))doAcquireInterruptibly(arg);}// ㈠ 可打断的获取锁流程private void doAcquireInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 在 park 过程中如果被 interrupt 会进入此// 这时候抛出异常, 而不会再次进入 for (;;)throw new InterruptedException();}}} finally {if (failed)cancelAcquire(node);}}
}