# AQS详解

在解决并发问题时,开发者需要各种同步功能进行线程间的通信、协调、控制,比如信号量、锁、条件等待通知等等。 JDK提供了AQS作为实现这些同步器(synchronizer)的基础,jdk中的并发工具类比如ReentrantLock, Semaphore, CountDownLatch等都是使用AQS来实现各自的线程同步能力的。

AQS因为方法命名比较抽象,导致很多朋友开始难以理解各个方法的作用、原理,不用担心,通过学习本文今天我们彻底掌握AQS。

# AQS使用

# AQS整体实现设计

AQS把同步器的操作分成两个操作,分别是acquire获取和release释放。 acquire和release操作的对象是AQS中的state变量。

比如对于锁来说,加锁对应acquire,释放锁对应release;对于Semaphore信号量来说,获取permit对应acquire,释放permit对应release。

在实现具体的同步器时,需要实现tryAcquire和tryRelease。 tryAcquire定义了获取成功的定义,比如在ReentrantLock中,state为0表示未加锁,大于0表示已加锁, tryAcquire就需要定义如何操作state表示当前线程获得了锁。 AQS中state字段是一个volatile的int实例字段,AQS提供了get,set和compareAndSetState方法来查看修改state的值。对于一个简单的锁实现来说acquire实现就是从0的状态下cas成1说明acquire成功, release就是把state设置为0,完善的锁实现还需要考虑重入、读写锁等功能,在后面会详细讲解。 tryRelease是释放时对state的修改。

img.png

AQS内部包含一个线程等待队列,若干个条件队列。

线程等待队列中存放的是等待的线程,线程要等待去执行一些代码,但是没有满足条件。

AQS把同步器的操作抽象等两个类型,一个是acquire获取,一个是release。acquire表示线程尝试获取某个资源,可能会被阻塞(比如加锁等),线程获取到之后 执行完自己的逻辑可以释放这个资源,也就是通过release,release可能能够使得在等待acquire的线程从挂起状态唤醒尝试重新获取。 如果一个线程acquire失败时,怎么处理呢?对于大部分的同步器来说,等待是一个很好的选择,等待到acquire成功,那么如何等待、如何唤醒、如何重新尝试acquire等 细节都是由AQS内部实现的。

当acquire调用tryAcquire失败时,就会将线程封装成一个Node放到Node队列中,Node等待其他线程唤醒,已经acquire成功的线程负责在release时唤醒Node队列中等待的 Node,Node唤醒后会重新尝试tryAcquire,如果tryAcquire成功则出队,否则继续等待。

让一个线程等待同步器,方法是将线程挂起,并将包含线程信息的Node通过cas追加到队列的尾部,保证并发场景下的原子性。

让一个线程从同步器中释放,表示通过了同步器的条件,可以执行特定代码,方法是从队列中出队一个Node节点,把队列head指向下一个等待的线程Node,保证原子性。

在独占锁这类场景中,同一时间只能有一个线程获取到锁(acquire),但是在读写锁、Semaphore、CountDownLatch等场景是要支持同一时间多个线程同时能acquire成功的。 所以AQS提供了独占(exclusived)和共享(shared)两种模式。 默认的独占模式下acquire和release调用的尝试方法为tryAcquire和tryRelease,共享模式下acquire和release的尝试方法对应tryAcquireShared和tryReleaseShared。

# 队列

为了方便操作,AQS中有一个dummy头结点,这个头结点便于判断一个Node是否是第一个节点(通过prev==head判断能够和null区分开), 为了节省内存,初始状态下head和tail都为null,第一次入队时初始化,给head和tail指向一个dummy节点无意义的Node对象上。

# 队列原子操作

队列的基本操作包含入队、出队、遍历节点、删除节点。

# 入队

无并发场景下,给一个队列入队,只需要找到队尾的Node对象,然后将该对象的next指针指向新的队尾Node对象,然后将队尾指针指向新的队尾Node对象。 不过在并发场景下,入队操作中包含若干个子步骤,所以不能保证原子性,一般要解决这类问题需要加锁,但是AQS是实现锁的底层技术,不适合套娃(性能等方面考虑)。 所以AQS中入队采取了弱一致性策略,只保证tail指针的正确性,tail指针指向真正的队尾结点,入队操作是cas tail指针由旧队尾节点改为新节点,cas成功说明这个 新Node对象入队成功,否则需要重试。所以其他的一些条件会有可能不一致,比如队列中一个Node的next==null并不能保证这个Node节点是队尾节点。 AQS在入队前会给新tail的prev设置为旧的tail,所以prev!=null不能说明Node在队列中,但是prev==null说明Node一定不在队列中。 又因为next引用的不一致性,所以遍历队列需要从队尾向前遍历。

入队时先设置prev再cas tail,能够保证在队列阻塞中的node一定能够被通知到(对于cancelled节点会通过cleanQueue来通知) 通知的实现机制是,被通知的线程先设置WAITING状态,然后重新尝试tryAcquire,然后再在blocking前重新检查status。进行通知的线程在unpark队列中阻塞的Node之前通过cas原子清理Node的WAITING状态。 具体的实现和原有后面会分析。

first节点(head的next)acquire成功后需要出队,会把first节点作为新的Node节点。步骤是先把first节点的prev设置为null,然后再更新head引用。 所以head指向一个对象并不能保证这个Node在AQS队列中,因为出队是先修改first节点的prev,再更新的head引用,Node的prev==null是可以说明这个Node是head节点的。

在队列中等待的节点,它的前继节点(predecessor、prev)在等待过程中可能会变化,因为prev节点可能会因为超时或中断变成CANCELLED然后被清理。 除非节点已经变成first节点,head不会cancel。acquire方法中会在进入WAITING挂起前检查一下pred的status,如果是CANCELLED,则会cleanQueue清理队列。

在cleanQueue中,会对找到的CANCELLED节点的前后节点的prev和next字段通过cas进行修改,一个线程成功cas修改prev后,其他线程可以帮忙cas修改next。 每次调用cleanQueue方法,都会不断遍历队列直到队列清理完成。如果清理之后Node变成了first节点,则一定调用unpark避免出现前继节点cancelled导致节点没有唤醒的边界情况。

插队问题: 即使线程位于first节点,也不能保证在acquire时一定acquire成功,因为有可能有其他线程调用acquire时,会先tryAcquire而不是先入队,所以可能有插队问题,这是为了 提升整体的吞吐率考虑的,因为有可能在first唤醒到tryAcquire成功之间,另一个线程已经完成了acquire和release,提升了吞吐。 发生了插队导致first节点tryAcquire失败的,first节点还需要继续等待,为了避免first节点长期抢不到tryAcquire,first节点会尝试穿插自旋等待和tryAcquire重试,重试一定次数后再park,每次park前增加重试次数最大127,超过之后不再park一直重试。

为了提升GC效率,从队列中出队的Node的字段都会设置为null。这会让getFirstQueuedThread稍微复杂,在实现过程中如果出现了数据不一致情况,则会从tail开始遍历。

队列需要一个dummy header node节点,但是为了节省内存空间,head和tail在第一次出现tryAcquire竞争(有线程tryAcquire失败)后才会创建head和tail节点。

shared模式和exclusive模式的区别是,一个node acquire成功后,如果next节点也是shared节点,会继续signal这个节点让它也唤醒尝试tryAcquire而不需要等待release才signal。

每个条件对象中存在一个条件队列,条件队列不需要支持并发,因为条件队列只在exclusive模式下使用,也就是同一时间只有一个线程能访问修改队列。 线程要在条件上等待时,会将node入队到条件队列中,signal的时候,node会从条件队列转移到AQS等待队列。

Node类中的head,tail,state字段使用volatile声明,并且使用Unsafe类的cas等方法进行访问修改。 waiter字段没有使用volatile声明,因为waiter字段总是在其他的volatile字段读写之间使用,能够利用jmm规则保证可见性。

# 结构

private transient volatile Node head;

/**
 * Tail of the wait queue. After initialization, modified only via casTail.
 */
private transient volatile Node tail;

/**
 * The synchronization state.
 */
private volatile int state;
1
2
3
4
5
6
7
8
9
10
11

Node定义了几个volatile字段, 并且提供了常用的cas、get、set方法。

waiter为什么不是volatile字段呢?因为waiter线程字段只会在入队时设置一次,并且在其他字段的volatile写之前写入,在其他volatile字段之后读取从而保证可见性。

Node中的setPrevRelaxed、setStatusRelaxed等方法,没有使用volatile语义,而是在使用时通过调用前后其他字段的volatile读写的happen-before语义保证可见性,降低内存屏障的使用提高性能。

比如以acquire方法中入队的步骤为例。 对于一个线程来说,执行acquire方法时,按照Java内存模型(Java Memory Model),代码的先后顺序在线程内符合happen-before原则, 所以setPrevRelaxed happen before与casTail,又因为casTail是Unsafe.compareAndSetReference,保证tail引用的volatile读写语义, happen before与之后其他线程的tail读,所以,这里的setPrevRelaxed设置prev引用也happen before与之后其他线程的tail读。

node.setPrevRelaxed(t);         // avoid unnecessary fence
if (t == null)
    tryInitializeHead();
else if (!casTail(t, node))
    node.setPrevRelaxed(null);  // back out
1
2
3
4
5

其他线程在读取tail后happen before与之后的读取prev,也就能推断出写入prev happen before与读取prev

比如getQueuedThreads方法,会先读取tail,再读取tail Node的prev,就能保证写入prev happen before与读取prev,也就能保证可见性。

public final Collection<Thread> getQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<>();
    for (Node p = tail; p != null; p = p.prev) {
        Thread t = p.waiter;
        if (t != null)
            list.add(t);
    }
    return list;
}
1
2
3
4
5
6
7
8
9
abstract static class Node {
    volatile Node prev;       // initially attached via casTail
    volatile Node next;       // visibly nonnull when signallable
    Thread waiter;            // visibly nonnull when enqueued
    volatile int status;      // written by owner, atomic bit ops by others

    // methods for atomic operations
    final boolean casPrev(Node c, Node v) {  // for cleanQueue
        return U.weakCompareAndSetReference(this, PREV, c, v);
    }
    final boolean casNext(Node c, Node v) {  // for cleanQueue
        return U.weakCompareAndSetReference(this, NEXT, c, v);
    }
    final int getAndUnsetStatus(int v) {     // for signalling
        return U.getAndBitwiseAndInt(this, STATUS, ~v);
    }
    final void setPrevRelaxed(Node p) {      // for off-queue assignment
        U.putReference(this, PREV, p);
    }
    final void setStatusRelaxed(int s) {     // for off-queue assignment
        U.putInt(this, STATUS, s);
    }
    final void clearStatus() {               // for reducing unneeded signals
        U.putIntOpaque(this, STATUS, 0);
    }

    private static final long STATUS
        = U.objectFieldOffset(Node.class, "status");
    private static final long NEXT
        = U.objectFieldOffset(Node.class, "next");
    private static final long PREV
        = U.objectFieldOffset(Node.class, "prev");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

Node的status有几种状态

Node的status默认为0,WAITING(1) 表示Node在队列中等待唤醒,COND(2)表示条件等待, WAITING和COND作为标记位使用,所以status也可以同时是WAITING和COND状态(状态是3),用bit判断是否是WAITING和COND。 如果Node因为超时或中断,需要取消,Node的状态是CANCELLED(0x80000000)是负数。

// Node status bits, also used as argument and return values
static final int WAITING   = 1;          // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND      = 2;          // in a condition wait
1
2
3
4
{

    // Concrete classes tagged by type
    static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

    static final class ConditionNode extends Node
        implements ForkJoinPool.ManagedBlocker {
        ConditionNode nextWaiter;            // link to next waiting node

        /**
         * Allows Conditions to be used in ForkJoinPools without
         * risking fixed pool exhaustion. This is usable only for
         * untimed Condition waits, not timed versions.
         */
        public final boolean isReleasable() {
            return status <= 1 || Thread.currentThread().isInterrupted();
        }

        public final boolean block() {
            while (!isReleasable()) LockSupport.park();
            return true;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# AQS实现

# acquire

acquire方法将JDK8中的acquireQueued、doAcquireNanos、doAcquireShared等各种版本的acquire逻辑统一起来,减少重复代码。

acquire方法参数

node: 只有Condition条件下不是null,其他情况都是null arg: acquire的参数 shared: 是shared模式还是exclusive模式 interruptible: 是否能响应中断 timed: 如果是true,会使用time参数作为超时时间实现超时 time: 超时时间,单位纳秒

方法的返回值

如果acquire成功返回正数 如果acquire超时返回0 如果被interrupt返回负数

acquire执行的过程中,在多种情况下都会调用tryAcquire,最后都不成功的情况下才设置WAITING状态等待,让线程进入等待状态一会再唤醒是一个相对比较重的操作。

final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    /*
     * Repeatedly:
     *  Check if node now first
     *    if so, ensure head stable, else ensure valid predecessor
     *  if node is first or not yet enqueued, try acquiring
     *  else if node not yet created, create it
     *  else if not yet enqueued, try once to enqueue
     *  else if woken from park, retry (up to postSpins times)
     *  else if WAITING status not set, set and retry
     *  else park and clear WAITING status, and check cancellation
     */

    for (;;) {
        // 如果first为false(初始值为false),给pred赋值,值为 null或node.prev,如果pred != null
        // 再判断下pred是不是head节点,如果pred是head,赋值first = true, 如果first还是false,进入if
        // first赋值为true,只会出现在pred != null的前提下,pred != null说明node != null, 也能推断出来 head != null
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            //
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        // 如果是first,或者pred是null(node参数传入的null的情况下)
        if (first || pred == null) {
            boolean acquired;
            // 先快速调用一下tryAcquireShared或tryAcquire尝试修改AQS state状态,如果tryAcquire成功,则不需要入队。
            try {
                if (shared)
                    // shared模式调用tryAcquireShared
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    // 如果不是shared,那就是exclusive mode,调用tryAcquire
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                // 出现异常调用cancelAcquire
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                // first说明node不为null,pred == head,则需要清理node.prev引用也就是head(避免再遍历到旧head),把当前的node作为新的head,
                // 如果不是first,说明当前线程还没有入队,则不需要修改队列状态 
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    // shared模式下,后面的node也有可能acquire成功,需要signal
                    if (shared)
                        signalNextIfShared(node);
                    // 判断下interrupted状态,如果被interrupted了,调用线程current.interrupt()方法设置中断状态
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        // 走到这里,说明Node
        // 如果node为空,需要创建Node并加入队列
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                // shared模式创建SharedNode类对象
                node = new SharedNode();
            else
                // exclusive模式创建ExclusiveNode类对象
                node = new ExclusiveNode();
            // 创建完node后,会从头开始执行if,后面会走到pred == null的 else if判断里
            // 没有设置status、prev,是为了能够再次tryAcquire
        } else if (pred == null) {          // try to enqueue
            // pred == null,说明不在队列中
            node.waiter = current;
            // 获取当前tail引用
            Node t = tail;
            // 先设置prev为当前的tail, TODO: 解释为什么可以用releaxed,而不是volatile语义等方法,利用happen before规则
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            // 如果tail为null,说明队列是空的,调用tryInitializeHead通过cas的方式初始化队列,设置dummy head(同时也是tail)
            if (t == null)
                tryInitializeHead();
            // 如果tail不为null,则通过cas修改tail引用为当前的新node,cas tail成功表示入队成功,否则重试
            else if (!casTail(t, node))
                // 没有入队成功,清理prev引用
                node.setPrevRelaxed(null);  // back out 
            else
                // cas tail 入队成功,修改oldTail的next引用到新的tail
                t.next = node;
            // 入队后依然没有设置成WAITING状态,目的是再给一次循环判断的机会,如果现在Node处于头结点了,并且tryAcquire成功,则acquire成功,否则再降级到WAITING
        } else if (first && spins != 0) {
            // 头结点的情况,使用自旋等待,自旋通过短暂的wait实现
            // 如果是头结点,并且spins不为0,则spins减一,并且调用Thread.onSpinWait()方法
            --spins;                        // reduce unfairness on rewaits
            // Thread.onSpinWait()方法用于此类自旋等待场景,但是当前的线程还不能继续,所以调用短暂的wait等待,运行时可以进行性能优化。
            Thread.onSpinWait();
        } else if (node.status == 0) {
            // 如果status=0,修改状态为WAITING状态,这样node可以接收signalNext/signalNextIfShared的unpark唤醒
            // 为什么不在创建完Node后直接设置status为WAITING呢?目的是这样能创建Node后等情况下调用tryAcquire尝试下acquire,避免过早进入WAITING状态,因为挂起线程是一个相对比较重的操作涉及到线程上下文切换等。
            node.status = WAITING;          // enable signal and recheck
            // 设置完成后,还会再运行循环检查下tryAcquire
        } else {
            // 走到这里说明node != null, pred != null, node.status != 0, 不是first或spins=0,且已经给node.status设置了WAITING状态(或CANCELLED)
            long nanos;
            // postSpins = postSpins * 2 + 1,超过byte上限后变为负数
            // spins用于解决位于first的节点但是在acquire时被频繁插队的问题,这种情况采取指数级增长的自旋等待重试
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                // 如果没有设定超时时间,则调用LockSupport.park让线程等待,直到通过LockSupport.unpark唤醒这个线程
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                // 有超时时间的,如果当前还没有超时,则调用LockSupport.parkNanos等待一段时间
                LockSupport.parkNanos(this, nanos);
            else
                // 如果有超时时间,现在已经超时了,则跳出循环,走到最后的cancelAcquire中
                break;
            // 修改node状态为0, 因为状态已经不是WAITING状态了
            node.clearStatus();
            // 判断如果当前线程被interrupted中断,并且参数传入的interruptible为true,设置interrupted为true
            if ((interrupted |= Thread.interrupted()) && interruptible)
                // 中断后,退出循环,醉倒最后的cancelAcquire中。
                break;
        }
    }
    // cancelAcquire会将node状态改为CANCELLED,并且清理队列中断CANCELLED节点。
    return cancelAcquire(node, interrupted, interruptible);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7

signalNext方法负责在release时唤醒队列中的当前Node的后继节点(如果有的话),并且会取消Node的status的WAITING标记位。 如果后继节点已经cancelled,可能会导致它以及后面的非cancelled的WAITING节点不能唤醒,但是cancelAcquire方法能够保证清理cancelled节点时,触发 队首节点唤醒,Node线程唤醒后,会继续执行acquire中的循环逻辑,判断自己是否是头结点是否可以tryAcquire,如果tryAcquire成功说明同步器获取成功执行自己的代码逻辑。

private static void signalNext(Node h) {
    Node s;
    // 判断下一个Node节点,如果status != null,说明是WAITING状态的Node
    if (h != null && (s = h.next) != null && s.status != 0) {
        // 清理WAITING状态标记位
        s.getAndUnsetStatus(WAITING);
        // 调用unpark唤醒Node上的线程
        LockSupport.unpark(s.waiter);
    }
}
1
2
3
4
5
6
7
8
9
10

# 线程挂起、唤醒的LockSupport类方法

AQS中需要在需要的时候让线程等待、唤醒线程,有同学可能知道Thread类里也有几个看上去相关的方法,比如 Thread.stop、Thread.suspend、Thread.resume, 但是这些方法因为(线程monitor lock被释放可能导致状态不一致、死锁等)各种不安全问题被废弃了,更详细可以参考文章 threadPrimitiveDeprecation (opens new window)

在jdk1.5,jdk中提供了LockSupport类,来方便实现线程等待、唤醒功能。

park()方法用户让线程挂起处于等待状态(WAITING)

public static void park() {
    U.park(false, 0L);
}
1
2
3

除了无参数的park()方法,LockSupport还提供了可以设置超时时间等待的parkNanos方法

unpark方法,unpark方法用于唤醒线程。

public static void unpark(Thread thread) {
    if (thread != null)
        U.unpark(thread);
}
1
2
3
4

# 条件等待、通知(ConditionObject)

img.png

同步器除了常见的等待,有时还需要实现更细粒度的等待功能,比如条件等待,在某个条件不满足时,线程在条件等待队列(不同于AQS的等待队列)中等待,条件满足后, 会从条件等待队列中移出加入到AQS等待队列中,然后在AQS队列中如果acquire成功,需要再判断条件是否满足(acquire之前条件状态可能已经被其他修改了),满足后执行业务逻辑,否则等待。

目前在锁(ReentrantLock, ReentrantReadWriteLock)中使用了这个条件等待通知功能,用于在一些情况下代替java的synchronized以及搭配的wait/notify/notifyAll。

等待和通知是条件等待的两个基本方法,等待用于在不满足条件时在条件对象(ConditionObject)中等待,通知用于在条件满足后将在条件等待队列中的Node转移到 AQS等待队列中,好让该Node有机会重新判断条件。

需要注意的是等待和通知都需要在已经加锁的前提下使用

常见的使用方式是

private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void conditionWait() throws InterruptedException {
    // 必须要要先加锁,再判断条件,如果反过来可能出现竞态条件,比如两个线程同时判断满足条件,都去加锁,修改状态,
    // 慢的线程获取到锁时,条件可能已经不满足了
    lock.lockInterruptibly();
    try {
        // while循环判断,以便条件不满足等待,等待结束后返回还要继续判断条件,知道
        while (!条件不满足) {
            try {
                // await需要在和condition对应的锁加锁的情况下执行。
                // 调用await会使得当前线程释放锁并挂起
                condition.await();
                // await()返回前,会重新获取到锁。
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 走到这里说明条件满足,执行逻辑
    } finally {
        // finally中释放锁,避免异常导致锁无法释放
        lock.unlock();
    }
}

public void conditionSignal() throws InterruptedException {
    // 需要加锁
    lock.lockInterruptibly();
    try {
        修改状态代码
        // 修改了状态,可能能使其他线程的等待条件满足,调用signalAll
        // 调用signalAll能够避免
        condition.signalAll();
    } finally {
        // finally中释放锁,避免异常导致锁无法释放
        lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 条件等待

条件对象(ConditionObject)是通过AQS对象的newCondition()方法创建出来的。

加完锁(AQS实现的锁)后,如果判断条件不满足,需要调用ConditionObject.await()将当前线程加入到条件对象的条件等待队列中, 线程会变成等待状态等待被signal通知后进入到AQS队列中,进入到AQS队列中后,会重新尝试获取AQS的acquire,acquire成功后,在锁的场景下,表示再次拿到了AQS的锁 需要再判断条件,如果条件满足,执行逻辑,否则还需要while循环进入await

下面以await()方法代码进行分析。(await还有带超时的版本,实现也类似)

public final void await() throws InterruptedException {
    // 提前判断一下是否已经被中断,如果中断,抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 创建一个ConditionNode对象
    ConditionNode node = new ConditionNode();
    // 把ConditionNode加入到当前的ConditionObject的条件等待队列中,会设置Node的status为COND|WAITING
    int savedState = enableWait(node);
    LockSupport.setCurrentBlocker(this); // for back-compatibility
    boolean interrupted = false, cancelled = false, rejected = false;
    // 循环等待,canReacquire表示node在AQS的等待队列中,在没有其他线程通过signal/signalAll把当前Node转移到AQS的等待队列之前,canReacquire都返回false
    while (!canReacquire(node)) {
        // 不再AQS队列中不能进行AQS的acquire,需要在条件队列中WAITING等待
        // 判断一下线程是否被中断,如果中断,取消Node的COND状态标记位,退出循环
        if (interrupted |= Thread.interrupted()) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;              // else interrupted after signal
        } else if ((node.status & COND) != 0) {
            // 在Node的status中包含COND的情况下
            try {
                // 如果被rejected(默认false),调用node.block
                if (rejected)
                    node.block();
                else
                    // 否则使用ForkJoinPool.managedBlock使Node挂起(最终还是调用的LockSupport.park());如果当前线程是ForkJoinWorkerThread则会有拒绝逻辑,也就是rejected
                    ForkJoinPool.managedBlock(node);
            } catch (RejectedExecutionException ex) {
                // 如果被ForkJoinPool reject,设置rejected为true,下次执行会直接调用node.block,最终也会调用到LockSupport.park()
                rejected = true;
            } catch (InterruptedException ie) {
                // 捕获中断异常,在外层处理中断,需要从等待队列中删除Node
                interrupted = true;
            }
        } else
            Thread.onSpinWait();    // awoke while enqueuing
    }
    // Node线程从上面的park等待中恢复的时机有
    // 1. 被signal/signalAll之后加入到AQS等待队列,并被AQS release时signalNext唤醒
    // 2. 线程被中断
    // 3. 如果试用的是带有超时时间的await方法,超过超时时间后也会恢复
    LockSupport.setCurrentBlocker(null);
    node.clearStatus();
    // 执行到这里,说明Node已经在AQS队列中并且是first节点了,还需要acquire才能进入到同步器中,
    acquire(node, savedState, false, false, false, 0L);
    if (interrupted) {
        if (cancelled) {
            unlinkCancelledWaiters(node);
            throw new InterruptedException();
        }
        Thread.currentThread().interrupt();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

为什么要用ForkJoinPool.managedBlock(node)

# 条件通知(signal/signalAll)

signal/signalAll实现

signal方法负责给ConditionObject的第一个Node(也是等待时间最长的)从条件等待队列中移出加入到AQS的等待队列中。 signalAll方法则把当前ConditionObject对象的条件等待队列上的所有Node都从条件等待队列中移出并加入到AQS的等待队列中。 signal和signalAll代码基本一致,除了调用doSignal中的all参数不同。

signal()方法

public final void signal() {
    ConditionNode first = firstWaiter;
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    if (first != null)
        doSignal(first, false);
}
1
2
3
4
5
6
7

doSignal方法,如果all是false(signal),则找到队列第一个waiter,出队,然后去掉Node的status的COND状态标记,调用enqueue(first)把这个Node 加入到AQS的等待队列中。对于all是true的情况,则会遍历队列,把所有的Node都出队然后加入到AQS的等待队列中。

private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        ConditionNode next = first.nextWaiter;
        if ((firstWaiter = next) == null)
            lastWaiter = null;
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
            enqueue(first);
            if (!all)
                break;
        }
        first = next;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

目前只给signal使用的enqueue方法,和acquire的入队过程类似,先设置prev,判断队列是否为空(tail == null),如果是初始化队列。队列不为空则cas tail入队。

入队完成后,还判断了下Node的status,如果小于0,说明这个Node被Cancelled取消了。

final void enqueue(Node node) {
    if (node != null) {
        for (;;) {
            Node t = tail;
            node.setPrevRelaxed(t);        // avoid unnecessary fence
            if (t == null)                 // initialize
                tryInitializeHead();
            else if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          // wake up to clean link
                    LockSupport.unpark(node.waiter);
                break;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# AQS和ConditionObject的关系,以及两者的队列的关系。

创建完AQS对象后,通过对象的实例方法可以创建ConditionObject,对于ConditionObject的await和signal方法都需要保证执行方法在AQS exclusive独占持有的情况下执行,参考ReentrantLock中的使用例子。 在await时,当前线程会创建一个ConditionNode入队到ConditionObject的条件等待队列中然后释放锁,唤醒后需要重新获取AQS锁才能从await方法中返回;在signal/signalAll方法调用时,会将条件队列中的Node(signal第一个/signalAll全部)从条件等待队列中出队并入队到AQS等待队列中并唤醒该Node线程。

# AbstractOwnableSynchronizer

AQS类继承与AbstractOwnableSynchronizer抽象类,AbstractOwnableSynchronizer中定义了setExclusiveOwnerThread,getExclusiveOwnerThread方法 用于设置、获取当前同步器的独占持有线程。

判断独占持有线程在锁(ReentrantLock, ReentrantReadWriteLock)中使用,因为这些锁要实现可重入功能,要判断加锁的线程是否是当前线程。

要注意的是AbstractOwnableSynchronizer中定义的exclusiveOwnerThread字段没有使用volatile声明,get set方法也没有synchronized加锁,所以 get set方法是不保证其他线程的可见性的。这是因为这两个方法主要用于判断这个线程和当前线程是否是同一个,如果是同一个线程,自然不会有可见性问题,如果不是同一个线程,即使有可见性问题,对于判断是否同一个线程也不会导致出错(肯定不相同)。

# juc(java util concurrent包)中的AQS应用

在ReentrantLock, ReentrantReadWriteLock, Semaphore, CountDownLatch, ThreadPoolExecutor.Worker等并发类中都使用了AQS来实现同步器的功能,在对应的类的实现分析中会详细介绍。

# 疑问

# 如果在signalNext之后加入队列,Node会唤醒吗?

要分析一下release和acquire之间可能出现的竞态条件。

对于signalNext之后才创建队列的情况,创建队列完成还会执行一下tryAcquire。 如果signalNext方法执行tryRelease返回true说明其他线程可能acquire成功,则调用signalNext, signalNext如果发现现在队列是空的(head == null或head.next == null 或next.status == 0),则不会唤醒这个Node。

这种情况说明Node还没完成最后的设置status增加WAITING标记位的操作,因为设置完成后,根据happen before,release方法是能读取到的。如果能读取到 release方法就会调用LockSupport.unpark(s.waiter),而LockSupport.unpark的一个特性就是,即使unpark比park在前,之后的第一个park也会立刻唤醒,所以这种情况也不会有liveness(Node无法唤醒)问题

如果release判断if条件时,status还没有设置WAITING标记位,因为执行到if判断了(也就是signalNext方法)这时tryRelease一定已经执行了,释放了release,其他线程可以tryAcquire成功(tryRelease返回true的情况)。 那么在acquire方法中设置完WAITING标记位,下一个循环还会尝试一次tryAcquire,这时tryAcquire应该返回true(除非有其他线程插队barging)线程也入队进行LockSupport.park;如果tryAcquire失败入队进行LockSupport.park挂起, 说明有其他线程acquire成功,那么这个线程在release时还会通过release唤醒first结点。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7

# acquire中spins的作用

AQS中等待队列的第一个节点(prev==head)被signalNext唤醒后还需要尝试tryAcquire,如果这时有其他线程(没有入队,不在AQS队列中)调用acquire, 则可能出现插队(这时默认的unfair的策略,能够提升整体的吞吐率,比如锁、Semaphore等),但是如果第一个节点唤醒后被插队再次进入WAITING状态挂起,如果acquire的并发量比较大,下次 唤醒后大概率还可能竞争不过插队的线程,为了避免过于不公平导致AQS队列中的Node太饥饿,AQS中增加了spins自旋等待的策略,进入AQS队列前,AQS会将 spins(默认0)改为2*spins + 1,然后唤醒后,即使tryAcquire失败,这个Node不会继续park,而是进行spins次的Thread.onSpinWait()以及tryAcquire, 如果spins次tryAcquire后,Node还是需要继续park挂起,挂起之前会继续把spins改为(byte)(2 * spins + 1),下次唤醒后继续重试spins的Thread.onSpinWait()+tryAcquire, spins最大值为127,127之后会变成-1,spins为-1时,这个Node不再park(else if (first && spins != 0) --spins),而是一直自旋等待。

# 超时、中断如何实现的

超时机制: acquire方法可以传入超时时间,让超过指定时间后,放弃acquire。这是通过LockSupport.parkNanos方法实现的指定时间的等待,超过这个时间后 线程会自动唤醒,并且下次循环时会判断时间是否超过了最开始acquire时计算的deadline,超过后break退出循环,走到cancelAcquire方法中,取消Node。

        if (!timed)
            LockSupport.park(this);
        else if ((nanos = time - System.nanoTime()) > 0L)
            LockSupport.parkNanos(this, nanos);
        else
            break;
        node.clearStatus();
        if ((interrupted |= Thread.interrupted()) && interruptible)
            break;
    }
}
return cancelAcquire(node, interrupted, interruptible);
1
2
3
4
5
6
7
8
9
10
11
12

中断情况,在一些场景下,我们可能希望线程等待acquire能够被中断避免无限等待,比如ReentrantLock.lockInterruptibly等可中断的方法都是中的 AQS的可中断功能,在AQS的acquire方法中有一个boolean类型的interruptible参数,表示acquire是否可以中断。 其他线程可以被其他线程通过Thread.interrupt实例方法设置中断标识,在AQS等待队列中等待(LockSupport.park)的Node或中断后调用LockSupport.park的线程 都会立刻从park中唤醒返回,并且线程的interrupted状态会设置成true。 这里判断如果能响应中断(interruptible为true)并且Thread.interrupted为true,就break退出循环,退出循环后和超时一样要调用cancelAcquire从队列中删除节点。

if ((interrupted |= Thread.interrupted()) && interruptible)
    break;
1
2

这里要注意的是调动Thread.interrupted()静态方法后,当前线程的中断标记会被清除,所以cancelAcquire中判断了如果是因为中断导致取消但是不是可中断的, 会重新调用Thread.currentThread().interrupt()恢复中断状态,让上层调用代码能够响应中断标识。

private int cancelAcquire(Node node, boolean interrupted,
                              boolean interruptible) {
    // 如果node不为null,要从队列中删除这个node
    if (node != null) {
        // 设置waiter引用为null,帮助GC
        node.waiter = null;
        // 设置状态为CANCELLED
        node.status = CANCELLED;
        if (node.prev != null)
            // prev不为null,说明可能在队列中,调用cleanQueue删除CANCELLED节点
            cleanQueue();
    }
    if (interrupted) {
        if (interruptible)
            return CANCELLED;
        else
            Thread.currentThread().interrupt();
    }
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

注意cleanQueue也是在并发场景下执行的,要考虑线程安全问题。 cleanQueue的代码位于两层循环,定义了p,q,s三个变量,q是指当前遍历的节点,p是q的prev节点,s是q的next节点。 外层循环的推出条件是p == null || (p = q.prev) == null,这个条件说明队列为空或者q已经是head节点,已经遍历完成,cleanQueue完成。 内层循环的遍历中,通过break推出一次清理动作,break完成后再从尾向前遍历循环。

内层退出条件有:

  1. 遍历tail时,tail指针被其他线程修改了(tail != q)
  2. 遍历其他节点时next节点的prev或status被修改(s.prev != q || s.status < 0),说明有其他线程也在清理或将next节点设置为CANCELLED
  3. q.status < 0,找到一个取消的节点,通过cas将这个节点从队列中删除
  4. (n = p.next) != q,
  5. q == null || (p = q.prev) == null,队列遍历完成,会退出整个方法
private void cleanQueue() {
    for (;;) {                               // restart point
        for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
            // 队列遍历完成
            if (q == null || (p = q.prev) == null)
                return;                      // end of list
            // 出现了前后引用出现了不一致状态,是由于其他线程也进行了清理或者有新Node入队,退出当前循环重新遍历
            if (s == null ? tail != q : (s.prev != q || s.status < 0))
                break;                       // inconsistent
            // 找到一个CANCELLED节点
            if (q.status < 0) {              // cancelled
                // 如果是tail,cas修改tail为prev,否则修改next的prev引用为当前节点q的prev p节点
                // cas成功后,再判断下q.prev == p,是
                if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
                    q.prev == p) {
                    // cas更新p的next引用,cas失败也没有关系,因为至少有一个线程能把p的next修改正确。
                    p.casNext(q, s);         // OK if fails
                    if (p.prev == null)
                        // p.prev == null说明p是head节点,说明first节点因为cancel被删除了,需要通过signalNext唤醒新的first节点,避免第一个节点cancel之后链表后面的节点接收不到signal唤醒
                        signalNext(p);
                }
                break;
            }
            // p.next != q,说明执行到这里时,有其他线程正在执行上面的代码清理q节点,已经修改了q的next引用,当前线程会尝试帮助清理,然后break
            if ((n = p.next) != q) {         // help finish
                if (n != null && q.prev == p) {
                    // cas更新next引用
                    p.casNext(n, q);
                    if (p.prev == null)
                        // 同样对于first节点cancel的情况需要重新signalNext唤醒新的first节点。
                        signalNext(p);
                }
                break;
            }
            s = q;
            q = q.prev;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 如何保证节点一定能唤醒(不会无限等待)

不考虑CANCELLED节点的情况下,已经acquire成功的线程,会在release时调用signalNext唤醒first节点(tryRelease返回true的情况下)。 如果节点已经在first位置等待,会被唤醒。如果不在first位置等待,说明还没有入队或者在队列其他位置。 如果在signalNext的这一瞬间,队列为空,则在设置status的WAITINGT状态后、park前,节点是会重新调用tryAcquire尝试的,如果可以acquire不需要park等待。 如果在signalNext的这一瞬间,队列不为空,节点在first位置,自然会被唤醒。 如果在signalNext的这一瞬间,队列不为空,节点不在first位置,则节点需要等待它的前继节点唤醒它。

再考虑下有CANCELLED节点的情况,取消的节点,会调用cleanQueue方法清理队列。清理过程中,如果CANCELLED节点之前是first节点,则会变成head,并且通过signalNext唤醒新的first节点。