基础共识
因为ReentrantLock是互斥锁,如果有一个操作是读多写少,同时还需要保证线程安全,那么使用ReentrantLock会导致效率比较低。于是便出现了ReentrantReadWriteLock。
在了解ReentrantReadWriteLock 之前,我们先了解ReentrantReadWriteLock的基本使用,确定一个简单的共识;
读读操作是共享的;
读写操作是互斥的;
写写操作是互斥的;
写读操作是互斥的
同一个线程中,先获取写锁,再获取读锁,可以拿到(写读可重入)
同一个线程中,先获取读锁,再获取写锁,不可以拿到(读写不可重入)
public class XxxTest {
// 读写锁!
static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 写锁
static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
// 读锁
static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
public static void main(String[] args) throws InterruptedException {
readLock.lock();
try {
System.out.println("拿到读锁!");
} finally {
readLock.unlock();
}
writeLock.lock();
try {
System.out.println("拿到写锁!");
} finally {
writeLock.unlock();
}
}
}
ReentrantReadWriteLock 核心思想
ReentrantReadWriteLock还是基于AQS实现的。很多功能的实现和ReentrantLock类似,还是基于AQS的state来确定当前线程是否拿到锁资源。
state表示读锁:将state的高16位作为读锁的标识
state表示写锁:将state的低16位作为写锁的标识
写锁重入过程:
写操作与其他操作时互斥的,代表着同一时间只有一个线程能获取到写锁;所以写锁重入的时候,对低16位 +1 即可;因为state 被拆分成了低16位 和 高16位;所以写锁的重入次数也从 2 ^31 -1 变成了 2^16-1 次。
读锁重入过程:
读操作的重入不能参考写锁,因为读操作时共享的;代表着同一时间,可能有多个线程拿到读锁; 所以每个获得读锁的线程,通过ThreadLocal 来记录重入的次数。
但是这不意味着读锁的重入不需要修改state;事实上,每次锁重入还是要修改state的值,需要基于ThreadLocal的记录。
每个读操作的线程,在获取读锁时都需要开辟ThreadLocal,读写锁为了提升效率,做了两件事情:
-
第一个拿到读锁的线程,不用ThreadLocal记录锁重入的次数;而是使用firstRead 记录锁重入的次数。
-
也会记录最后一个拿到读锁的线程的重入次数,交给cachedHoldCounter;可以避免频繁在锁重入时修改ThreadLocal。
写锁源码分析流程
写锁获取锁流程
acquire 方法
public void lock() {
sync.acquire(1);
}
// AQS 的 acquire 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们可以看到,ReentrantReadWriteLock 获取写锁的流程与ReentrantLock十分类似。
acquire 方法描述:
-
尝试获取锁资源,获取成功,返回做自己的业务逻辑
-
获取失败,则封装成Node,调用addWaiter方法,进入到队列。
-
acquireQueued:当前排队的能否竞争锁资源,不能竞争就挂起线程阻塞
tryAcquire
tryAcquire 是AQS的抽象方法,具体由ReentrantReadWriteLock具体实现
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
代码描述:
-
c 表示当前state的值,如果是0,表示没有锁竞争,如果不等于0表述由锁竞争。
-
w 是由exclusiveCount 计算得来,表示重入的次数,如果锁的持有者不等于当前线程,则返回false;
-
如果锁的持有者是当前线程,判断重入次数是否大于最大重入次数,大于抛异常。小于重入次数增加
-
如果state等于0,writerShouldBlock 由公平锁和非公锁两种实现
非公锁直接返回false,会进行后面的 CAS 尝试获取锁 公平锁则进入判断是否需要排队;需要排队,则返回false;否则也会进行CAS 尝试获取锁
static final int SHARED_SHIFT = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c)
addWaiter
与ReentrantLock 流程类似
acquireQueued
与ReentrantLock 流程类似
写锁释放锁流程
public void unlock() {
sync.release(1);
}
### AQS 实现的释放锁流程
public final boolean release(int arg) {
if (tryRelease(arg)) {
AbstractQueuedSynchronizer.Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release方法描述:
-
tryRelease 只有tryRealse是读写锁重新实现的方法,其他的和ReentrantLock一致
-
判断头节点是否为空,头节点的状态是否可以唤醒
-
如果可以唤醒,则唤醒离头节点最近的 waitStatus 为-1 的线程。(还是从tail 往 head 找)
// 读写锁的真正释放
protected final boolean tryRelease(int releases) {
// 判断释放锁的线程是不是持有锁的线程
if (!isHeldExclusively())
// 不是抛异常
throw new IllegalMonitorStateException();
// 对state - 1
int nextc = getState() - releases;
// 拿着next从获取低16位的值,判断是否为0
boolean free = exclusiveCount(nextc) == 0;
// 返回true
if (free)
// 将持有互斥锁的线程信息置位null
setExclusiveOwnerThread(null);
// 将-1之后的nextc复制给state
setState(nextc);
return free;
}
tryRelease 方法描述:
-
判断释放锁的线程是否为持有锁的线程,不是抛异常
-
是重入次数减少,并计算是否已经释放干净
-
释放干净的化,把持有的线程设置为null,并设置state 值
读锁源码分析流程
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// tryAcquireShared,尝试获取锁资源,获取到返回1,没获取到返回-1
if (tryAcquireShared(arg) < 0)
// doAcquireShared 前面没拿到锁,这边需要排队~
doAcquireShared(arg);
}
acquireShared 方法描述:
-
tryAcquireShared 尝试获取锁资源,获取到返回1,没有获取到返回-1
-
没有获取到,则进行排队
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
tryAcquireShared 方法描述:
-
写锁状态不为0 ,且当前线程不等于持有锁线程,返回-1;排队去
-
没有写锁,获取读锁信息;判断是否需要block;判断读锁次数是不是大于MAX
readerShouldBlock
公平锁: 有人排队,返回true,直接拜拜,没人排队,返回false
非公平锁:正常的逻辑是非公平直接抢,因为是读锁,每次抢占只要CAS成功,必然成功// 这就会出现问题,写操作无法在读锁的情况抢占资源,导致写线程饥饿,一致阻塞…………
// 非公平锁会查看next是否是写锁的,如果是,返回true,如果不是返回false -
尝试CAS的方式去获取锁,获取失败会进行fullTryAcquireShared 尝试
-
获取成功,判断读锁是不是0(即是不是第一次拿到读锁);firstReaderHoldCount ;如果获取锁的线程是第一次拿到读锁的线程,则进行firstReaderHoldCount 的累加;否则是其他线程,则记录cachedHoldCounter (这个是ThreadLocal 的缓存)
fullTryAcquireShared方法
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 如果我的次数是0,绝对不是重入操作!
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
此方法的绝大多是流程与tryAcquireShared 一致,通过一个for循环一直尝试获取锁。
doAcquireShared 方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
//LockSupport.park挂起~~
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireShared 方法描述:(获取锁失败后,进行线程的挂起)
-
添加SHARED 类型的节点到 队列
-
获取之前的节点Node,如果前置节点是head 在进行以此尝试获取锁
-
不是head 节点,则判断是不是需要挂起
-
需要挂起,则调用parkAndCheckInterrupt 挂起,内部通过LockSupport实现。
- 本文链接: https://www.sunce.wang/archives/zhe-yi-ci-gan-le-duo-xian-cheng-yu-gao-bing-fa-zhi-shen-ru-reentrantreadwritelock
- 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!