认识ReentrantLock

ReentrantLock 是通过AQS实现的,AQS 维护了一个 volatile 修饰state 状态值,和一个双向链表(Node中包含prev,next,thread属性);多个线程会通过CAS的方式修改state,在并发情况下,只会有一个线程成功的修改state(从0~1)。修改成功就是抢锁成功,修改失败就是抢锁失败。

抢锁失败会到AQS的双向链表中排队等待(在期间,线程可能会挂起)。

提到并发编程,除了synchronized,你必须了解的便是ReentrantLock。这里我们先一起看看它的庐山真面目,看看它是如何使用的。

public static void main(String[] args) throws InterruptedException {
    ReentrantLock nonfair = new ReentrantLock(); //非公平锁
    ReentrantLock fair = new ReentrantLock(true); //公平锁
    lock.lock(); //锁
    boolean falg1 = lock.tryLock(); //tryLock 有返回值
    boolean falg2 = lock.tryLock(1, TimeUnit.DAYS); //tryLock时间,有返回值
    lock.lockInterruptibly();//可以被打断的lock
    lock.unlock(); //解锁
}

image-1672281203397

ReentrantLock 与 synchronized 比较

  • 是否公平锁

    synchronized是非公平锁,ReentrantLock可以是公平锁和非公平锁

    new ReentrantLock() 非公平锁
    
    new ReentrantLock(true) 公平锁
    
  • 是否互斥锁

    synchronized,ReentrantLock 都是互斥锁

  • 语言特性

    synchronized 是关键字,是基于对象是实现的

    ReentrantLock是 Doug Lea 从JDK1.5 引入的,基于AQS 和 CAS 实现

  • 功能完善

    ReentrantLock 功能更加高级,提供lockInterruptibly 在获取锁的过程中可以被打断。

    ReentrantLock提供的tryLock 可以指定过期时间

  • 使用

    如果竞争比较激烈,推荐lock锁,效率更高

    如果几乎没有竞争,推荐synchronized

    因为synchronized 的重量级锁不存在降级为轻量级锁或者偏向锁的过程;竞争比较激烈的情况会一直是重量级锁。

源码分析

lock 方法


public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    public void lock() {
        sync.lock();
    }
          
}    

lock 方法内部调用 sync.lock() ,我们一起看看 sync.lock()的真身。

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

    abstract void lock();
 
 }  
    

sync. lock() 是个抽象方法, Sync 继承于AbstractQueuedSynchronizer(传说中的AQS)。

而这个lock 方法,有两种是实现,一种公平锁实现,另一种便是非公平锁实现。

公平锁 lock 方法

static final class FairSync extends Sync {
        
    final void lock() {
         acquire(1);
    }
    
}    

非公平锁lock方法

 static final class NonfairSync extends Sync {

     final void lock() {
         if (compareAndSetState(0, 1))
             setExclusiveOwnerThread(Thread.currentThread());
        else
             acquire(1);
        }

      protected final boolean tryAcquire(int acquires) {
          return nonfairTryAcquire(acquires);
      }
 }

我们可以看到非公平锁比较粗暴,上来便是使用 compareAndSetState 尝试进行一次锁资源的获取,获取成功便设置拥有者为当前线程,否则也执行 acquire(1) 方法。

acquire(1) 方法

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
}

不管是公平锁还是非公平锁,acquire都是调用AQS的acquire(int arg) 方法。
方法内部流程如下:

  • tryAcquire(arg) 尝试获取锁,拿到锁则返回,直接结束

  • 没有拿到锁,addWaiter(Node.EXCLUSIVE), arg) 将没有获取到锁资源的线程封装成Node 对象。

  • 继续调用acquireQueued 方法。

tryAcquire 方法

tryAcquire 这里公平锁和非公平锁有不同的实现。

非公平锁实现

 protected final boolean tryAcquire(int acquires) {
     return nonfairTryAcquire(acquires);
}

### sync的nonfairTryAcquire 方法
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平锁描述:

  1. 获取State,如果是0,表示目前没有竞争,去CAS获取锁;获取成功则设置拥有线程是当前线程(这里CAS获取有可能失败)。

  2. 如果不是0,判断拥有的线程是否与 当前线程相同,相同 则设置state +1;这里其实是锁的可重入。

  3. 否则返回失败

公平锁实现

### FairSync tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
}

公平锁流程描述:

  1. 获取State 值, 0 表示没有竞争,则调用hasQueuedPredecessors ;看看队列种有没有Node节点在等待获取锁,如果有,条件判断为false 则直接退出。

  2. 如果没有,则用非公平锁类似,采用CAS 去尝试获取,获取成功则设置为用于线程为当前线程,并返回true。

  3. 否则返回false。

addWaiter 方法

static final Node EXCLUSIVE = null;

private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
    
    AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), mode);

    AbstractQueuedSynchronizer.Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    enq(node);
    return node;
    
}


private AbstractQueuedSynchronizer.Node enq(final AbstractQueuedSynchronizer.Node node) {
    for (;;) {
        AbstractQueuedSynchronizer.Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new AbstractQueuedSynchronizer.Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter 方法描述

  1. 获取 tail 节点并赋值给pred

  2. 判断pred 是否为空,不为空;则指定原有tai节点为 pred

  3. CAS 设置tail 节点,CAS 可能操作失败;CAS 成功则 把pred 的next 节点设置为当前节点。返回

  4. 如何CAS 设置失败 通过enq 设置;最终返回。enq 一定可以设置成功。

  5. enq 是一个死循环设置的过程,设置成功才会返回。enq 多一个设置head 的过程,下一次循环便可以设置tail 了。

acquireQueued

addWaiter看过后,我们终于可以进入acquireQueued 的流程。

final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final AbstractQueuedSynchronizer.Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

### shouldParkAfterFailedAcquire 判断是否需要Park

private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
    int ws = pred.waitStatus;
    
    
    if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
        return true;
    
     // 循环往前找,找到一个状态小于等于0的节点
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
     // 将小于等于0的节点状态该为-1
        compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL);
    }
    return false;
}

### 使用LockSupport.park 挂起
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

acquireQueued 流程描述:

  1. node.predecessor 获取上一个节点。

  2. 判断是不是head ,如果是;尝试获取锁资源;成功后,设置头节点,并把下一个节点设置为null

  3. 如果不是head,则判断是否需要Park

  4. shouldParkAfterFailedAcquire 的waitStatus 为.Node.SIGNAL 则返回true;如果节点状态大于0,则表示取消状态,需要一直往前找到状态为 -1 的;如果是 小于等于0的,会修改成-1 状态;(从外层下一次循环就返回true了)

  5. parkAndCheckInterrupt 使用操作系统的 LockSupport.park(this) 来挂起。

unlock 方法

以上我们了解清楚了lock 的流程;我们一起再来看看解锁的流程。

 public void unlock() {
   sync.release(1);
 }
 
 
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        AbstractQueuedSynchronizer.Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}       

private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    AbstractQueuedSynchronizer.Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

unlock 方法描述:

  1. 使用sync.release() 然后调用tryRelease 进行锁的释放

  2. tryRelease方法 将state 的值减去,因为锁重入的关系,所以可能设置了很多,当等于0的时候,表示锁释放干净了。

  3. 如果head 不等于null,说明还有线程在竞争这个锁;则取消他们的挂起状态。

ReentrantLock tryLock 方法一览

   public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

使用的是sync 的nonfairTryAcquire ;在讲非公平锁的tryAcquire的时候讲过。

其他方法可以参考此流程去梳理,大致上类似,有一些基于自己业务的实现。

ReentrantLock 总结

ReentrantLock 是通过AQS实现的,AQS 维护了一个 volatile 修饰state 状态值,和一个双向链表(Node中包含prev,next,thread属性);多个线程会通过CAS的方式修改state,在并发情况下,只会有一个线程成功的修改state(从0~1)。

如果线程修改state失败怎么办?

如果线程没有拿到锁资源,会到AQS的双向链表中排队等待(在期间,线程可能会挂起)。

AQS 问题

AQS 为什么有一个虚拟head 节点?

因为AQS提供了ReentrantLock的基本实现,而在ReentrantLock释放锁资源时,需要去考虑是否需要执行unparkSuccessor方法,去唤醒后继节点。

因为Node中存在waitStatus的状态,默认情况下状态为0,如果当前节点的后继节点线程挂起了,那么就将当前节点的状态设置为-1。这个-1状态的出现是为了避免重复唤醒或者释放资源的问题。

因为AQS中排队的Node中的线程如果挂起了,是无法自动唤醒的。需要释放锁或者释放资源后,再被释放的线程去唤醒挂起的线程。 因为唤醒节点需要从整个AQS双向链表中找到离head最近的有效节点去唤醒。而这个找离head最近的Node可能需要遍历整个双向链表。如果AQS中,没有挂起的线程,代表不需要去遍历AQS双向链表去找离head最近的有效节点。

为了避免出现不必要的循环链表操作,提供了一个-1的状态。如果只有一个Node进入到AQS中排队,所以发现如果是第一个Node进来,他必须先初始化一个虚拟的head节点作为头,来监控后继节点中是否有挂起的线程。

AQS 为什么使用双向链表而不是单向链表?

首先AQS中一般是存放没有获取到资源的Node,而在竞争锁资源时,ReentrantLock提供了一个方法,lockInterruptibly方法,也就是线程在竞争锁资源的排队途中,允许中断。中断后会执行cancelAcquire方法,从而将当前节点状态置为1,并且从AQS队列中移除掉。如果采用单向链表,当前节点只能按到后继或者前继节点,这样是无法将前继节点指向后继节点的,需要遍历整个AQS从头或者从尾去找。单向链表在移除AQS中排队的Node时,成本很高。

当前在唤醒后继节点时,如果是单向链表也会出问题,因为节点插入方式的问题,导致只能单向的去找有效节点去唤醒,从而造成很多次无效的遍历操作,如果是双向链表就可以解决这个问题。

为什么唤醒线程时从尾部找到离head 最近的节点,而不是直接从head 找最近的节点?

口诀:从尾到头,有头有尾;从头到尾,有头无尾。

因为在addWaiter操作时,是先将当前Node的prev指针指向前面的节点,然后是将tail赋值给当前Node,最后才是能上一个节点的next指针,指向当前Node。

如果从前往后,通过next去找,可能会丢失某个节点,导致这个节点不会被唤醒~

如果从后往前找,肯定可以找到全部的节点。