Java AQS(AbstractQueuedSynchronizer)解析

2020/01/01 Java

AQS 和 CAS 并不是有你非我的关系。CAS 是线程安全的一种方法,通过 compare and swap 的方式完成线程安全的操作。而 AQS 是一种框架,很多的组件通过它实现,它里面的很多需要线程安全的操作也是通过 CAS 做的。

AQS 介绍

AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等)。

AQS 内部已经实现了 Node 和队列,在 AQS 中线程会被封装成一个一个等待进入同步的节点实现同步功能。

AQS 的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

在 LOCK 包中的相关锁(常用的有 ReentrantLock、 ReadWriteLock)都是基于 AQS 来构建。然而这些锁都没有直接来继承 AQS,而是定义了一个 Sync 类去继承 AQS。那么为什么要这样呢?因为锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的隔离二者所关注的事情。

AQS 使用一个 int 类型的成员变量 state 来表示同步状态,当 state>0 时表示有线程已经获取了锁并且在排队获取锁,当 state = 0 时表示此时无线程占有锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态 state 进行操作,当然 AQS 可以确保对 state 的操作是安全的。

AQS使用一个整数 state 以表示同步状态,并通过 getState、setState 及 compareAndSetState 等 protected 类型方法进行状态转换。巧妙的使用 state,可以表示任何状态,如:

  • ReentrantLock用state表示所有者线程已经重复获取该锁的次数。
  • Semaphore用state表示剩余的许可数量。
  • CountDownLatch用state表示闭锁的状态,如关闭、打开。
  • FutureTask用state表示任务的状态,如尚未开始、正在运行、已完成、已取消。

除了state,在同步器类中还可以自行管理一些额外的状态变量。如:

  • ReentrantLock保存了锁的当前所有者的信息,这样就能区分某个获取操作是重入的还是竞争的。
  • FutureTask用result表示任务的结果,该结果可能是计算得到的答案,也可能是抛出的异常。

在 AQS 内置的 FIFO 同步队列(称为 CLH 队列)来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS 则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

AQS 方法

AQS主要提供了如下一些方法(以同步状态 state 为核心):

  • getState():返回同步状态的当前值;
  • setState(int newState):设置当前同步状态;
  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
  • tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态
  • tryRelease(int arg):独占式释放同步状态;
  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
  • tryReleaseShared(int arg):共享式释放同步状态;
  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
  • releaseShared(int arg):共享式释放同步状态;

CLH 同步队列(Craig, Landin, and Hagersten)

AQS 是通过一个双向的 FIFO 同步队列(称为 CLH,队列元素是 Thread)来完成同步状态的管理,当有线程获取锁失败后,就被添加到队列末尾,让我看一下这个队列(实现中是以 Node 链表形式):

AQS队列

红色节点为头结点,可以把它当做正在持有锁的节点。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    static final class Node {...}
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;//同步状态

由上可知,它把 head 和 tail 设置为了 volatile,这两个节点的修改将会被其他线程看到。事实上,我们也主要是通过修改这两个节点来完成入队和出队。接下来一起我们一起学习 Node。

static final class Node {
    //该等待同步的节点处于共享模式
    static final Node SHARED = new Node();
    //该等待同步的节点处于独占模式
    static final Node EXCLUSIVE = null;

    //等待状态,这个和state是不一样的:有1,0,-1,-2,-3五个值
    volatile int waitStatus;
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile Node prev;//前驱节点
    volatile Node next;//后继节点
    volatile Thread thread;//等待锁的线程

    //Link to next node waiting on condition, or the special value SHARED. Condition 的等待队列,如果共享模式没有 Condition,所以就用特殊节点 SHARED 标记
    Node nextWaiter;
    //Returns true if node is waiting in shared mode
    final boolean isShared() {
            return nextWaiter == SHARED;
        }

下面解释下 waitStatus 五个的得含义:

  • CANCELLED(1):该节点的线程可能由于超时或被中断而处于被取消(作废)状态,一旦处于这个状态,节点状态将一直处于 CANCELLED(作废),因此应该从队列中移除。
  • SIGNAL(-1):当前节点为 SIGNAL 时,后继节点会被挂起,因此在当前节点释放锁或被取消之后必须被唤醒(unparking)其后继结点。
  • CONDITION(-2):该节点的线程处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为 0,重新进入阻塞状态。
  • 0:新加入的节点

在锁的获取时,并不一定只有一个线程才能持有这个锁(或者称为同步状态),所以此时有了独占模式和共享模式的区别,也就是在 Node 节点中由 nextWait 来标识。比如 ReentrantLock 就是一个独占锁,只能有一个线程获得锁,而 WriteAndReadLock 的读锁则能由多个线程同时获取,但它的写锁则只能由一个线程持有。

入列

CLH 队列入列是再简单不过了,无非就是 tail 指向新节点、新节点的 prev 指向当前最后的节点,当前最后一个节点的 next 指向当前节点。代码我们可以看看 addWaiter(Node node) 方法:

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); 
    // Try the fast path of enq; backup to full enq on failure 快速尝试添加尾节点
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {  //CAS设置尾节点 
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter(Node node) 先通过 CAS 快速尝试设置尾节点,如果失败,则调用 enq(Node node) 方法设置尾节点:

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) { //一直尝试
        Node t = tail; //tail不存在,设置为首节点   
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在上面代码中,两个方法都是通过一个 CAS 方法 compareAndSetTail(Node expect, Node update) 来设置尾节点,该方法可以确保节点是线程安全添加的。在 enq(Node node) 方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

入列

出列

CLH 同步队列遵循 FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head 执行该节点并断开原首节点的 next 和当前节点的 prev 即可,注意在这个过程是不需要使用 CAS 来保证的,因为只有一个线程能够成功获取到同步状态。

出列

/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling.  It is OK if this
        * fails or if status is changed by waiting thread.
        */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
        * Thread to unpark is held in successor, which is normally
        * just the next node.  But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
    // Makes available the permit for the given thread, if it
    //  * was not already available.  If the thread was blocked on
    //  * {@code park} then it will unblock.  Otherwise, its next call
    //  * to {@code park} is guaranteed not to block. This operation
    //  * is not guaranteed to have any effect at all if the given
    //  * thread has not been started.
        LockSupport.unpark(s.thread);
}

CLH 核心几个回答要点:

  • 双向链表入列出列
  • CAS 算法设置尾节点+死循环自旋。

ConditionObject

通过 newCondition() 方法新建一个 ConditionObject。

ConditionObject

ConditionObject 队列与 CLH 队列:

  • 调用了 await() 方法的线程,会被加入到 conditionObject 等待队列中,并且唤醒 CLH 队列中 head 节点的下一个节点。
  • 线程在某个 ConditionObject 对象上调用了 singnal() 方法后,等待队列中的 firstWaiter 会被加入到 AQS 的 CLH 队列中,等待被唤醒。
  • 当线程调用 unLock() 方法释放锁时,CLH 队列中的 head 节点的下一个节点(在本例中是 firtWaiter),会被唤醒。

区别:

  • ConditionObject 对象都维护了一个单独的等待队列 ,AQS 所维护的 CLH 队列是同步队列,它们节点类型相同,都是 Node。

独占式

同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。

  • 公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。
  • 非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。

acquire(int arg)是独占式获取同步状态的方法,我们来看一下源码:

public final void acquire(long arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过。

final boolean acquireQueued(final Node node, long arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { //就是个死循环
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

独占式

独占模式同步状态的获取:

AQS的模板方法 acquire 通过调用子类自定义实现的 tryAcquire 获取同步状态失败后 -> 将线程构造成 Node 节点 -> 将 Node 节点添加到同步队列对尾(addWaiter)-> 节点以自旋的方法获取同步状态(acquirQueued)。在节点自旋获取同步状态时,只有其前驱节点是头节点的时候才会尝试获取同步状态,如果该节点的前驱不是头节点或者该节点的前驱节点是头节点单获取同步状态失败,则判断当前线程需要阻塞,如果需要阻塞则需要被唤醒过后才返回。

独占模式同步状态的释放:

首先调用子类的 tryRelease() 方法释放锁,然后唤醒后继节点,在唤醒的过程中,需要判断后继节点是否满足情况,如果后继节点不为且不是作废状态,则唤醒这个后继节点,否则从tail节点向前寻找合适的节点,如果找到则唤醒。

共享式

多个线程可同时执行,如 Semaphore/CountDownLatch 等都是共享式的产物。

acquireShared(long arg) 是共享式获取同步状态的方法,可以看一下源码:

public final void acquireShared(long arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

由上可得,先调用 tryAcquireShared(int arg) 方法尝试获取同步状态,如果获取失败,调用 doAcquireShared(int arg) 自旋方式获取同步状态,方法源码如下:

private void doAcquireShared(long arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

ReentrantLock

lock()

以非公平锁举例

final void lock() {
    if (compareAndSetState(0, 1)) // CAS,state 如果是 0 代表没人占有,就设为 1,把当前线程设成 owner
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1); // 否则就去队列获取锁
}

下面这个方法是 aqs 类中的了

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //封成一个等待节点,插入尾巴
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { //死循环自旋获取锁
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { //如果是头了,并且可以获取了,就可以占用了
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

tryAcquire 就是尝试获取锁,看看能否获取上,

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) { // CAS 看看能否获取到
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 他已经占有独占锁,给 c +1。
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

unLock()

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); //h 是当前占有的,且不是等待的,唤醒
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling.  It is OK if this
        * fails or if status is changed by waiting thread.
        */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
        * Thread to unpark is held in successor, which is normally
        * just the next node.  But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null); //成功独占锁设为空
    }
    setState(c);
    return free;
}

Search

    Table of Contents