基础共识

因为ReentrantLock是互斥锁,如果有一个操作是读多写少,同时还需要保证线程安全,那么使用ReentrantLock会导致效率比较低。于是便出现了ReentrantReadWriteLock。

image-1672393461780

在了解ReentrantReadWriteLock 之前,我们先了解ReentrantReadWriteLock的基本使用,确定一个简单的共识;

读读操作是共享的;
读写操作是互斥的;
写写操作是互斥的;
写读操作是互斥的
同一个线程中,先获取写锁,再获取读锁,可以拿到(写读可重入)
同一个线程中,先获取读锁,再获取写锁,不可以拿到(读写不可重入)

public class XxxTest {
    // 读写锁!
    static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    // 写锁
    static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

    // 读锁
    static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) throws InterruptedException {
        readLock.lock();
        try {
            System.out.println("拿到读锁!");
        } finally {
            readLock.unlock();
        }

        writeLock.lock();
        try {
            System.out.println("拿到写锁!");
        } finally {
            writeLock.unlock();
        }
    }
}

ReentrantReadWriteLock 核心思想

ReentrantReadWriteLock还是基于AQS实现的。很多功能的实现和ReentrantLock类似,还是基于AQS的state来确定当前线程是否拿到锁资源。

state表示读锁:将state的高16位作为读锁的标识

state表示写锁:将state的低16位作为写锁的标识

写锁重入过程:

写操作与其他操作时互斥的,代表着同一时间只有一个线程能获取到写锁;所以写锁重入的时候,对低16位 +1 即可;因为state 被拆分成了低16位 和 高16位;所以写锁的重入次数也从 2 ^31 -1 变成了 2^16-1 次。

读锁重入过程:

读操作的重入不能参考写锁,因为读操作时共享的;代表着同一时间,可能有多个线程拿到读锁; 所以每个获得读锁的线程,通过ThreadLocal 来记录重入的次数。

但是这不意味着读锁的重入不需要修改state;事实上,每次锁重入还是要修改state的值,需要基于ThreadLocal的记录。

每个读操作的线程,在获取读锁时都需要开辟ThreadLocal,读写锁为了提升效率,做了两件事情:

  1. 第一个拿到读锁的线程,不用ThreadLocal记录锁重入的次数;而是使用firstRead 记录锁重入的次数。

  2. 也会记录最后一个拿到读锁的线程的重入次数,交给cachedHoldCounter;可以避免频繁在锁重入时修改ThreadLocal。

写锁源码分析流程

写锁获取锁流程

acquire 方法

       public void lock() {
            sync.acquire(1);
        }
        
      // AQS 的 acquire 方法  
      public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们可以看到,ReentrantReadWriteLock 获取写锁的流程与ReentrantLock十分类似。

acquire 方法描述:

  1. 尝试获取锁资源,获取成功,返回做自己的业务逻辑

  2. 获取失败,则封装成Node,调用addWaiter方法,进入到队列。

  3. acquireQueued:当前排队的能否竞争锁资源,不能竞争就挂起线程阻塞

tryAcquire

tryAcquire 是AQS的抽象方法,具体由ReentrantReadWriteLock具体实现

Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
 if (c != 0) {
    // (Note: if c != 0 and w == 0 then shared count != 0)
    if (w == 0 || current != getExclusiveOwnerThread())
        return false;
    if (w + exclusiveCount(acquires) > MAX_COUNT)
        throw new Error("Maximum lock count exceeded");
    // Reentrant acquire
    setState(c + acquires);
    return true;
}
if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
setExclusiveOwnerThread(current);
        return true;

代码描述:

  1. c 表示当前state的值,如果是0,表示没有锁竞争,如果不等于0表述由锁竞争。

  2. w 是由exclusiveCount 计算得来,表示重入的次数,如果锁的持有者不等于当前线程,则返回false;

  3. 如果锁的持有者是当前线程,判断重入次数是否大于最大重入次数,大于抛异常。小于重入次数增加

  4. 如果state等于0,writerShouldBlock 由公平锁和非公锁两种实现

     非公锁直接返回false,会进行后面的 CAS 尝试获取锁
     
     公平锁则进入判断是否需要排队;需要排队,则返回false;否则也会进行CAS 尝试获取锁
    

    static final int SHARED_SHIFT = 16;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    static int exclusiveCount(int c)

addWaiter

与ReentrantLock 流程类似

acquireQueued

与ReentrantLock 流程类似

写锁释放锁流程


public void unlock() {
    sync.release(1);
}

### AQS 实现的释放锁流程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        AbstractQueuedSynchronizer.Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release方法描述:

  1. tryRelease 只有tryRealse是读写锁重新实现的方法,其他的和ReentrantLock一致

  2. 判断头节点是否为空,头节点的状态是否可以唤醒

  3. 如果可以唤醒,则唤醒离头节点最近的 waitStatus 为-1 的线程。(还是从tail 往 head 找)


// 读写锁的真正释放
protected final boolean tryRelease(int releases) {
    // 判断释放锁的线程是不是持有锁的线程
    if (!isHeldExclusively())
        // 不是抛异常
        throw new IllegalMonitorStateException();
    // 对state - 1
    int nextc = getState() - releases;
    // 拿着next从获取低16位的值,判断是否为0
    boolean free = exclusiveCount(nextc) == 0;
    // 返回true
    if (free)
        // 将持有互斥锁的线程信息置位null
        setExclusiveOwnerThread(null);
    // 将-1之后的nextc复制给state
    setState(nextc);
    return free;
}

tryRelease 方法描述:

  1. 判断释放锁的线程是否为持有锁的线程,不是抛异常

  2. 是重入次数减少,并计算是否已经释放干净

  3. 释放干净的化,把持有的线程设置为null,并设置state 值

读锁源码分析流程

public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    // tryAcquireShared,尝试获取锁资源,获取到返回1,没获取到返回-1
    if (tryAcquireShared(arg) < 0)
        // doAcquireShared 前面没拿到锁,这边需要排队~
        doAcquireShared(arg);
}

acquireShared 方法描述:

  1. tryAcquireShared 尝试获取锁资源,获取到返回1,没有获取到返回-1

  2. 没有获取到,则进行排队

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);

    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
          
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))      
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

tryAcquireShared 方法描述:

  1. 写锁状态不为0 ,且当前线程不等于持有锁线程,返回-1;排队去

  2. 没有写锁,获取读锁信息;判断是否需要block;判断读锁次数是不是大于MAX

    readerShouldBlock

    公平锁: 有人排队,返回true,直接拜拜,没人排队,返回false
    非公平锁:正常的逻辑是非公平直接抢,因为是读锁,每次抢占只要CAS成功,必然成功

    // 这就会出现问题,写操作无法在读锁的情况抢占资源,导致写线程饥饿,一致阻塞…………
    // 非公平锁会查看next是否是写锁的,如果是,返回true,如果不是返回false

  3. 尝试CAS的方式去获取锁,获取失败会进行fullTryAcquireShared 尝试

  4. 获取成功,判断读锁是不是0(即是不是第一次拿到读锁);firstReaderHoldCount ;如果获取锁的线程是第一次拿到读锁的线程,则进行firstReaderHoldCount 的累加;否则是其他线程,则记录cachedHoldCounter (这个是ThreadLocal 的缓存)

fullTryAcquireShared方法

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            if (firstReader == current) {
              
            } else {
                if (rh == null) {
                 
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) { 
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 如果我的次数是0,绝对不是重入操作!
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; 
            }
            return 1;
        }
    }
}

此方法的绝大多是流程与tryAcquireShared 一致,通过一个for循环一直尝试获取锁。

doAcquireShared 方法

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; 
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                //LockSupport.park挂起~~
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared 方法描述:(获取锁失败后,进行线程的挂起)

  1. 添加SHARED 类型的节点到 队列

  2. 获取之前的节点Node,如果前置节点是head 在进行以此尝试获取锁

  3. 不是head 节点,则判断是不是需要挂起

  4. 需要挂起,则调用parkAndCheckInterrupt 挂起,内部通过LockSupport实现。