darling2047

ReentrantLock介绍及源码解析

一、ReentrantLock介绍

  • ReentrantLock是JUC包下的一个并发工具类,可以通过他显示的加锁(lock)和释放锁(unlock)来实现线程的安全访问,ReentrantLock还可以实现公平锁和非公平锁,并且其与synchronized的作用是一致的,区别在于加锁的底层实现不一样,写法上也不一样,具体异同可以参见下图:

二、ReentrantLock的源码简析

1、源码分析

  • ReentrantLock(下面简称RL)就是AQS独占锁的一个典型实现,其通过维护state变量的值来判断当前线程是否能够拥有锁,如果通过cas将state成功从0变成1表示争用资源成功,否则表示争用失败,进入CLH队列,通过CLH队列来维护那些暂时没抢占到锁资源的线程;其内部维护了一个名为Sync的内部类来继承AQS,又因为RL既可以支持公平锁也可以支持非公平锁,所以其内部还维护了两个内部类FairSync和NonfairSync来继承Sync,通过他们来实现AQS的模板方法从而实现加锁的过程;类的关系图如下:

  • 公平锁和非公平锁在源码层的两点区别:

    1、非公平上来直接抢锁

    2、当state=0时,非公平直接抢,公平锁还会判断队列还有没有前置节点

2、lock方法的源码跟踪

下面就让我们跟踪RL的lock()和unLock()源码来看看代码级别是怎么实现的吧!

需要注意的是,本文跟踪的是非公平锁的加解锁过程,公平锁的实现大体一致,当源码中有与公平锁的显著差别时我会通过注释给出解释

  • 试用例如下
public static void main(String[] args) throws InterruptedException {
    long start = System.currentTimeMillis();
    List<Thread> list = new ArrayList<>();
    ReentrantLock lock = new ReentrantLock();
    for (int i = 0; i < 1000; i++) {
        Thread thread = new Thread(()-> {
            for (int j = 0; j < 1000; j++) {
                // 解锁
                lock.lock();
                count++;
                // 释放锁
                lock.unlock();
            }
        });
        list.add(thread);
    }
    for (Thread thread : list) {
        thread.start();
    }
    for (Thread thread : list) {
        thread.join();
    }
    System.out.println("auto.count = " + count + "耗时:" + (System.currentTimeMillis() -start));
}

(1)、lock()的源码跟踪与解析

跟踪lock.lock()发现其调用的是内部类Sync的lock()方法,该方法是一个抽象方法,具体实现由FairSync和NonfairSync实现,由于我们构造RL时调用的是无参构造函数,所以这里会直接进入NonfairSync的lock()方法;具体实现代码和注释如下:

/**
 * java.util.concurrent.locks.ReentrantLock.NonfairSync#lock()
 */
final void lock() {
    // 由于是非公平锁所以这里上来直接争抢资源,尝试通过CAS操作将state的值由0变成1
    if (compareAndSetState(0, 1)) 
        // 如果成功将state值变成1表示争抢锁成功,设置当前拥有独占访问权的线程。
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 争抢失败再进入与公平锁一样的排队逻辑
        acquire(1);
}

tips:

1、上面的compareAndSetState方法也是由AQS提供的,里面借助Unsafe实现了对state的cas操作更新

2、setExclusiveOwnerThread也可以理解成由AQS提供(其实是AQS的父类,不过不影响理解),给exclusiveOwnerThread变量赋值,exclusiveOwnerThread表示当前正在拥有锁的线程

3、acquire方法同样由AQS提供,其内部实现也是lock环节比较关键的代码,下面我会详细解释

(2)、acquire()的源码跟踪与解析

acquire方法的源码如下:

/**
 *  java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire(int)
 */
public final void acquire(int arg) {
    /**
     * 1、尝试获取锁;如果成功此方法结束,当前线程执行同步代码块
     * 2、如果获取失败,则构造Node节点并加入CLH队列
     * 3、然后继续等待锁
     */
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果获取锁失败,添加CLH队列也失败,那么直接中断当前线程
        selfInterrupt();
}

tips:

1、tryAcquire方法是AQS的一个模板方法,RL下的公平和非公平锁都有不同的实现,下面会详解

2、addWaiter方法是AQS的一个默认实现方法,负责构造当前线程所在的Node,并将其设置到队列的尾巴上

3、acquireQueued方法也是AQS的默认实现,旨在设置CLH队列的head和阻塞当前线程

上面的三个方法下面也会一一介绍

(3)、tryAcquire()的源码跟踪与解析

  • tryAcquire()方法可以理解成尝试获取锁,如果获取成功即表示当前线程拥有了锁;跟踪源码需要注意的一点是:该方法在非公平锁(NonFairSync)下的实现最终调用的是Sync里的nonfairTryAcquire方法,所以我们直接观察该方法是如何实现的即可
/**
 * java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire(int)
 */
final boolean nonfairTryAcquire(int acquires) {
    // 当前线程
    final Thread current = Thread.currentThread();
    // 获取当前state的值
    int c = getState();
    if (c == 0) {
        /**
         * 非公平锁发现资源未被占用时直接CAS尝试抢占资源;而公平锁发现资源未被占用时
         * 先判断队列里是否还有前置节点再等待,没有才会去抢占资源
         */
        if (compareAndSetState(0, acquires)) {
            // 如果成功将state值变成1表示争抢锁成功,设置当前拥有独占访问权的线程。
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    /** 
     * 如果state!=0表示有争用,再判断当前系统拥有独占权限的线程是不是当前线程,
     * 如果是,则需要支持线程重入,将state的值加1
     */
    else if (current == getExclusiveOwnerThread()) {// 处理可重入的逻辑
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // state既不等于0也不需要重入则返回false;表示获取锁失败,代码返回后继续执行acquireQueued方法
    return false;
}

(4)addWaiter()的源码跟踪与解析

  • 执行到addWaiter方法表示前面的tryAcquire尝试获取锁失败了,需要由此方法构建Node节点并加入到CLH队列的末尾;此方法返回的Node即为当前CLH队列的tail节点
/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)
 */
private Node addWaiter(Node mode) {
    // 构建Node对象
    Node node = new Node(Thread.currentThread(), mode);
    /**
     * 将当前队列的尾节点赋值给pred,通过命名和下面的代码其实可以发现就是想让tail作为当前节点的前置节点;
     * 但是为什么不直接用tail而将其赋值给pred再用呢?我想应该是考虑并发环境下tail的引用有可能会被其他线程改变
     */
    Node pred = tail;
    if (pred != null) {
        // 如果当前队列的尾结点(tail)不为空,就将其作为当前Node节点的前置节点
        node.prev = pred;
        // 然后通过AQS自带的cas方法将当前构建的Node节点插入到队列的尾巴上
        if (compareAndSetTail(pred, node)) {
            // 如果成功了,前置节点也就是之前的tail节点的后继节点就是当前节点,赋值
            pred.next = node;
            // 返回构建的Node节点,即当前队列的tail节点
            return node;
        }
    }
    // 如果队列的tail节点为空,或者cas设置tail节点失败的话调用此方法;旨在重新设置队列的tail节点
    enq(node);
    return node;
}

(5)、acquireQueued()的源码跟踪与解析

  • 当线程通过tryAcquire上锁失败,然后通过addWaiter将当前线程添加到队列末尾后,通过此方法再次判断是否轮到当前节点,并再次尝试获取锁,获取不到的话进行阻塞操作,源码与注释如下:
/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node, int)
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取tail节点的前置节点
            final Node p = node.predecessor();
            /**
             * 如果前置节点就是头节点表示当前tail节点就是第二个节点,就可以尝试着去获取锁,
             * 然后将tail节点设置成头节点,返回线程中断状态为false;表示当前线程获取到锁
             */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                /**
                 * 既然tail已经获取到锁了,那么前置节点就没用了,这里将前置节点的next设置为空,
                 * 是为了方便垃圾回收,因为如果不指定为空,前置节点的next就是当前的tail节点,
                 * 不会被回收
                 */
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /**
             * 如果前置节点不为head,或者虽然前置节点是head但是获取锁失败,那么就
             * 需要在这里将线程阻塞,阻塞利用的是LockSupport.park(thread)来实现的
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 退出获取锁
            cancelAcquire(node);
    }
}

至此,RL非公平锁加锁的过程的源码跟踪完毕,流程也不算复杂,下面简单梳理一遍:

1、上来直接尝试获取锁(修改state值),成功表示获取成功

2、否则执行tryAcquire方法尝试通过cas的方式获取锁,并处理可能存在的重入操作

3、获取失败则通过addWriter方法构建Node节点并加入CLH队列的末尾

4、然后在acquireQueued里再次获取锁,获取失败则阻塞当前线程;

下面简单画了一下lock()方法的调用泳道图

1、调用父类AQS的compareAndSetState通过cas的模式尝试将state状态改为1,修改成功则持有锁,将当前线程设为ExclusiveOwnerThread

3、unLock方法的源码跟踪

  • 释放锁其实就是将state状态减1,然后处理可重入逻辑,如果没有重入的话直接唤醒当前队列的head节点,把当前线程所在的Node节点从队列中剔除
  • unLock方法对应AQS的tryRelease模板方法的实现,其没有lock那么复杂,因为不用支持公平和非公平锁,所以其可以直接在sync中调用AQS提供的release方法,然后触发tryRelease,调用sync里的tryRelease实现从而实现解锁

AQS的release源码

/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#release(int)
 */
public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        // 释放成功,判断当前队列头节点是否为空,不为空并且等待状态不等于0则唤醒当前队列的头节点
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

RL的tryRelease实现

/**
 * java.util.concurrent.locks.ReentrantLock.Sync#tryRelease(int)
 * @param releases
 * @return
 */
protected final boolean tryRelease(int releases) {
    // state减1
    int c = getState() - releases;
    // 如果当前线程不是正在获取到锁的线程直接抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果state减1后等于0表示没有重入,表示释放锁成功,将当前获取锁的线程置空
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 将最新的state状态更新到AQS中
    setState(c);
    return free;
}

unlock()总结:

1、调用父类AQS的release方法实际调用的是tryRelease这个模板方法由ReentrantLock本身实现

2、tryRelease方法尝试将state减1,如果减完等于0表示解锁成功,将ExclusiveOwner线程设为空;并且唤醒队列的头节点(unparkSuccessor)。

3、如果不等于0表示解锁失败,将state设为减1过后的值;也是为了可重入

相关文章: