初识AQS

基本的获取锁的方法调用流程
AQS学习

public final void acquire(int arg) {
    // 由子类重写的方法先取尝试一下能不能获取到锁
    if (!tryAcquire(arg) &&
    // 如果没有获取到锁则addWaiter再acquireQueued
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter加入队列

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
 // 根据当前线程创建一个node并且入队,先尝试一次,如果失败就用enq方法来重试
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    // 循环重试插入
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

最后是acquireQueued, 一个线程是继续执行还是阻塞就决定在这里了:

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
     // 很多地方会调用,包括condition和上面的acquire.
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取锁失败后是否应该停止线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire阻塞线程,设置父节点状态SIGNAL

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
     // 这个方法需要由调用者来循环重试,值到将前一个节点的状态改为-1为止
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 如果前面一个节点的状态是-1则block,表名是正常的可以被唤醒
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //如果不是-1,大于0,说明上一级节点状态是1,异常的,需要过滤掉所有的上一级节点,一致找到上一级节点小于0的,认这个小于0的节点为上一级节点。说白了就是找个靠谱的父节点。
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
            // 这里代表为-2,或者-3,或者0
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

这里借用一下别人的说法"waitStatus 中 SIGNAL(-1) 状态的意思,Doug Lea 注释的是:代表后继节点需要被唤醒。也就是说这个 waitStatus 其实代表的不是自己的状态,而是后继节点的状态,我们知道,每个 node 在入队的时候,都会把前驱节点的状态改为 SIGNAL,然后阻塞,等待被前驱唤醒"

 *   SIGNAL:     The successor of this node is (or will soon be)
 *               blocked (via park), so the current node must
 *               unpark its successor when it releases or
 *               cancels. To avoid races, acquire methods must
 *               first indicate they need a signal,
 *               then retry the atomic acquire, and then,
 *               on failure, block.
 注释:
successor代表此节点的下一个节点(继承者)。
我这个node状态为-1代表我的继承者是或者将要是阻塞的,所以当我这个node释放了或者取消的时候,我必须唤醒我的继承者。为了避免冲突。。。这一段指的应该就是shouldParkAfterFailedAcquire方法吧。
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 如果锁没有被占用
    if (c == 0) {
        // 先看看队列有没有其他元素,有其他元素则直接返回尝试失败
        if (!hasQueuedPredecessors() &&
        // 如果队列没有其他元素,那就尝试来抢一次,如果抢不到也就返回尝试失败
            compareAndSetState(0, acquires)) {
            // 如果尝试成功,则将锁的线程设置为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果获取锁的是当前线程,则重入,增加status的值,因为如果走到这里肯定只有本线程会调用,所以是线程安全的
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

final void lock() {
    acquire(1);
}
public final void acquire(int arg) {
// 获取锁失败
if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

addWaiter(Node.EXCLUSIVE) // 独占模式加入阻塞队列
acquireQueued

相关文章:

  • 2021-04-12
  • 2021-12-28
  • 2021-08-29
  • 2021-08-05
  • 2021-12-01
  • 2022-12-23
  • 2022-12-23
  • 2021-05-08
猜你喜欢
  • 2022-12-23
  • 2021-09-16
  • 2021-08-21
  • 2021-04-19
相关资源
相似解决方案