认识ReentrantLock
ReentrantLock 是通过AQS实现的,AQS 维护了一个 volatile 修饰state 状态值,和一个双向链表(Node中包含prev,next,thread属性);多个线程会通过CAS的方式修改state,在并发情况下,只会有一个线程成功的修改state(从0~1)。修改成功就是抢锁成功,修改失败就是抢锁失败。
抢锁失败会到AQS的双向链表中排队等待(在期间,线程可能会挂起)。
提到并发编程,除了synchronized,你必须了解的便是ReentrantLock。这里我们先一起看看它的庐山真面目,看看它是如何使用的。
public static void main(String[] args) throws InterruptedException {
ReentrantLock nonfair = new ReentrantLock(); //非公平锁
ReentrantLock fair = new ReentrantLock(true); //公平锁
lock.lock(); //锁
boolean falg1 = lock.tryLock(); //tryLock 有返回值
boolean falg2 = lock.tryLock(1, TimeUnit.DAYS); //tryLock时间,有返回值
lock.lockInterruptibly();//可以被打断的lock
lock.unlock(); //解锁
}
ReentrantLock 与 synchronized 比较
-
是否公平锁
synchronized是非公平锁,ReentrantLock可以是公平锁和非公平锁
new ReentrantLock() 非公平锁 new ReentrantLock(true) 公平锁
-
是否互斥锁
synchronized,ReentrantLock 都是互斥锁
-
语言特性
synchronized 是关键字,是基于对象是实现的
ReentrantLock是 Doug Lea 从JDK1.5 引入的,基于AQS 和 CAS 实现
-
功能完善
ReentrantLock 功能更加高级,提供lockInterruptibly 在获取锁的过程中可以被打断。
ReentrantLock提供的tryLock 可以指定过期时间
-
使用
如果竞争比较激烈,推荐lock锁,效率更高
如果几乎没有竞争,推荐synchronized
因为synchronized 的重量级锁不存在降级为轻量级锁或者偏向锁的过程;竞争比较激烈的情况会一直是重量级锁。
源码分析
lock 方法
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public void lock() {
sync.lock();
}
}
lock 方法内部调用 sync.lock() ,我们一起看看 sync.lock()的真身。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
}
sync. lock() 是个抽象方法, Sync 继承于AbstractQueuedSynchronizer(传说中的AQS)。
而这个lock 方法,有两种是实现,一种公平锁实现,另一种便是非公平锁实现。
公平锁 lock 方法
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
非公平锁lock方法
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
我们可以看到非公平锁比较粗暴,上来便是使用 compareAndSetState 尝试进行一次锁资源的获取,获取成功便设置拥有者为当前线程,否则也执行 acquire(1) 方法。
acquire(1) 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
不管是公平锁还是非公平锁,acquire都是调用AQS的acquire(int arg) 方法。
方法内部流程如下:
-
tryAcquire(arg) 尝试获取锁,拿到锁则返回,直接结束
-
没有拿到锁,addWaiter(Node.EXCLUSIVE), arg) 将没有获取到锁资源的线程封装成Node 对象。
-
继续调用acquireQueued 方法。
tryAcquire 方法
tryAcquire 这里公平锁和非公平锁有不同的实现。
非公平锁实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
### sync的nonfairTryAcquire 方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平锁描述:
-
获取State,如果是0,表示目前没有竞争,去CAS获取锁;获取成功则设置拥有线程是当前线程(这里CAS获取有可能失败)。
-
如果不是0,判断拥有的线程是否与 当前线程相同,相同 则设置state +1;这里其实是锁的可重入。
-
否则返回失败
公平锁实现
### FairSync tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁流程描述:
-
获取State 值, 0 表示没有竞争,则调用hasQueuedPredecessors ;看看队列种有没有Node节点在等待获取锁,如果有,条件判断为false 则直接退出。
-
如果没有,则用非公平锁类似,采用CAS 去尝试获取,获取成功则设置为用于线程为当前线程,并返回true。
-
否则返回false。
addWaiter 方法
static final Node EXCLUSIVE = null;
private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), mode);
AbstractQueuedSynchronizer.Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private AbstractQueuedSynchronizer.Node enq(final AbstractQueuedSynchronizer.Node node) {
for (;;) {
AbstractQueuedSynchronizer.Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new AbstractQueuedSynchronizer.Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter 方法描述
-
获取 tail 节点并赋值给pred
-
判断pred 是否为空,不为空;则指定原有tai节点为 pred
-
CAS 设置tail 节点,CAS 可能操作失败;CAS 成功则 把pred 的next 节点设置为当前节点。返回
-
如何CAS 设置失败 通过enq 设置;最终返回。enq 一定可以设置成功。
-
enq 是一个死循环设置的过程,设置成功才会返回。enq 多一个设置head 的过程,下一次循环便可以设置tail 了。
acquireQueued
addWaiter看过后,我们终于可以进入acquireQueued 的流程。
final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final AbstractQueuedSynchronizer.Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
### shouldParkAfterFailedAcquire 判断是否需要Park
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
int ws = pred.waitStatus;
if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
return true;
// 循环往前找,找到一个状态小于等于0的节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将小于等于0的节点状态该为-1
compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL);
}
return false;
}
### 使用LockSupport.park 挂起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
acquireQueued 流程描述:
-
node.predecessor 获取上一个节点。
-
判断是不是head ,如果是;尝试获取锁资源;成功后,设置头节点,并把下一个节点设置为null
-
如果不是head,则判断是否需要Park
-
shouldParkAfterFailedAcquire 的waitStatus 为.Node.SIGNAL 则返回true;如果节点状态大于0,则表示取消状态,需要一直往前找到状态为 -1 的;如果是 小于等于0的,会修改成-1 状态;(从外层下一次循环就返回true了)
-
parkAndCheckInterrupt 使用操作系统的 LockSupport.park(this) 来挂起。
unlock 方法
以上我们了解清楚了lock 的流程;我们一起再来看看解锁的流程。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
AbstractQueuedSynchronizer.Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
AbstractQueuedSynchronizer.Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
unlock 方法描述:
-
使用sync.release() 然后调用tryRelease 进行锁的释放
-
tryRelease方法 将state 的值减去,因为锁重入的关系,所以可能设置了很多,当等于0的时候,表示锁释放干净了。
-
如果head 不等于null,说明还有线程在竞争这个锁;则取消他们的挂起状态。
ReentrantLock tryLock 方法一览
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
使用的是sync 的nonfairTryAcquire ;在讲非公平锁的tryAcquire的时候讲过。
其他方法可以参考此流程去梳理,大致上类似,有一些基于自己业务的实现。
ReentrantLock 总结
ReentrantLock 是通过AQS实现的,AQS 维护了一个 volatile 修饰state 状态值,和一个双向链表(Node中包含prev,next,thread属性);多个线程会通过CAS的方式修改state,在并发情况下,只会有一个线程成功的修改state(从0~1)。
如果线程修改state失败怎么办?
如果线程没有拿到锁资源,会到AQS的双向链表中排队等待(在期间,线程可能会挂起)。
AQS 问题
AQS 为什么有一个虚拟head 节点?
因为AQS提供了ReentrantLock的基本实现,而在ReentrantLock释放锁资源时,需要去考虑是否需要执行unparkSuccessor方法,去唤醒后继节点。
因为Node中存在waitStatus的状态,默认情况下状态为0,如果当前节点的后继节点线程挂起了,那么就将当前节点的状态设置为-1。这个-1状态的出现是为了避免重复唤醒或者释放资源的问题。
因为AQS中排队的Node中的线程如果挂起了,是无法自动唤醒的。需要释放锁或者释放资源后,再被释放的线程去唤醒挂起的线程。 因为唤醒节点需要从整个AQS双向链表中找到离head最近的有效节点去唤醒。而这个找离head最近的Node可能需要遍历整个双向链表。如果AQS中,没有挂起的线程,代表不需要去遍历AQS双向链表去找离head最近的有效节点。
为了避免出现不必要的循环链表操作,提供了一个-1的状态。如果只有一个Node进入到AQS中排队,所以发现如果是第一个Node进来,他必须先初始化一个虚拟的head节点作为头,来监控后继节点中是否有挂起的线程。
AQS 为什么使用双向链表而不是单向链表?
首先AQS中一般是存放没有获取到资源的Node,而在竞争锁资源时,ReentrantLock提供了一个方法,lockInterruptibly方法,也就是线程在竞争锁资源的排队途中,允许中断。中断后会执行cancelAcquire方法,从而将当前节点状态置为1,并且从AQS队列中移除掉。如果采用单向链表,当前节点只能按到后继或者前继节点,这样是无法将前继节点指向后继节点的,需要遍历整个AQS从头或者从尾去找。单向链表在移除AQS中排队的Node时,成本很高。
当前在唤醒后继节点时,如果是单向链表也会出问题,因为节点插入方式的问题,导致只能单向的去找有效节点去唤醒,从而造成很多次无效的遍历操作,如果是双向链表就可以解决这个问题。
为什么唤醒线程时从尾部找到离head 最近的节点,而不是直接从head 找最近的节点?
口诀:从尾到头,有头有尾;从头到尾,有头无尾。
因为在addWaiter操作时,是先将当前Node的prev指针指向前面的节点,然后是将tail赋值给当前Node,最后才是能上一个节点的next指针,指向当前Node。
如果从前往后,通过next去找,可能会丢失某个节点,导致这个节点不会被唤醒~
如果从后往前找,肯定可以找到全部的节点。
- 本文链接: https://www.sunce.wang/archives/这一次肝了多线程与高并发之深入reentrantlock
- 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!