# ReentrantReadWriteLock

img.png

ReentrantReadWriteLock是jdk提供的读写锁,读写锁类似mysql中的读写锁概念,读锁与读锁之间可以共存,但是写锁和读锁写锁之间都不能同时加锁。 读写锁适用于在一些读多写少的场景下代替ReentrantLock,让读与读之间可以并发,比如现在有一个线程不安全的数据结构,读多写少,我们可以使用ReentrantReadWriteLock, 在读取时加读锁,修改数据时加写锁。

# 使用方式

ReentrantReadWriteLock的使用方式为,首先创建一个共享的ReentrantReadWriteLock对象,然后通过这个对象的readLock和writeLock方法分别获取读锁和写锁。 这两个锁都实现了Lock接口,可以使用lock,unlock,tryLock等方法,需要注意的是unlock需要放到加锁代码的finally块中保证锁最终能释放。

锁降级: 如果一个线程已经拿到了写锁,是可以继续加读锁的,并且可以在释放读锁之前释放写锁,那么持有的锁就会降级为读锁,其他线程的读锁可以加锁。 但是反过来是不行的,也就是一个线程加了读锁,是不能再加写锁的,是为了避免出现死锁的情况(两个加了读锁的线程都在尝试加写锁,但是写锁由被各自的读锁阻塞)类似mysql中的读写锁死锁)。

公平锁: 默认情况(默认的构造函数)下读写锁是非公平锁,不能保证先到先得,有可能出现插队的情况,吞吐量更高,如果希望使用公平锁,可以通过构造函数制定fair为true。

Condition: 读写锁的写锁也提供和ReentrantLock一样的newCondition方法创建条件对象,在加了写锁后可以进行条件判断条件等待条件通知。

private final Map<String, Data> m = new TreeMap<>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();

public Data get(String key) {
 r.lock();
 try { return m.get(key); }
 finally { r.unlock(); }
}
public Data put(String key, Data value) {
 w.lock();
 try { return m.put(key, value); }
 finally { w.unlock(); }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 实现原理

ReentrantReadWriteLock和ReentrantLock一样基于AbstractQueuedSynchronizer实现,每个ReentrantReadWriteLock内部有一个AQS对象,

读锁和写锁分别通过shared mode(共享模式)和exclusive mode(独占模式)进行acquire和release。

ReentrantReadWriteLock使用AQS中的state存储读写锁的各自当前加锁次数(包括锁的重入次数和读锁的所有线程的进入次数)。

state是int字段,ReentrantReadWriteLock将state分为左右各16bit的部分来分别存储,左边(也就是高位)存储读锁当前持有数量,右边(也就是低位)存储写锁当前持有数量。

# ReentrantReadWriteLock构造函数和核心字段

ReentrantReadWriteLock中包含了sync, readLock, writeLock三个字段,sync是读写锁中AQS实现类的对象实例, readLock和writerLock对象通过各自构造函数传入sync对象构造,readLock和writeLock的方法都会委托给sync来实现

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync;

    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

readLock和writeLock两者的区别是使用的获取释放方法不同,例如读锁使用的acquireShared、releaseShared等shared模式的方法,写锁使用的是acquire、release等exclusive模式的方法。 读写锁各自对于公平锁的实现也有区别,因为读锁和读锁间可以同时加锁。

# WriteLock实现

WriteLock的lock方法实现为调用sync的acquire(1),表示AQS获取,稍后分析Sync类的实现。 unlock方法调用sync的release(1),表示AQS释放。其他的lockInterruptibly等也都对应于sync中的实现。

public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    public void lock() {
        sync.acquire(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryWriteLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# ReadLock实现

ReadLock和WriteLock的注意区别是,Lock的各个方法使用sync的shared模式的方法,比如acquireShared,releaseShared等,并且ReadLock不支持创建ConditionObject

public static class ReadLock implements Lock, java.io.Serializable {
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    public void lock() {
        sync.acquireShared(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryReadLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# Sync类实现

Sync类继承于AbstractQueuedSynchronizer,并且为了支持公平锁和非公平锁还有FairSync和NonfairSync两个子类

Sync类的state字段(继承于AQS),高16位存储读锁的持有次数sharedCount,低16位存储写错的持有次数exclusiveCount。

先定义了计算这两个计数的常量和方法,另外因为16位的限制,读锁最多同时加65535个(包含重入),写锁最多重入65535次。

abstract static class Sync extends AbstractQueuedSynchronizer {

    static final int SHARED_SHIFT = 16;
    static final int SHARED_UNIT = (1 << SHARED_SHIFT);
    static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    // 通过state计算读锁持有次数
    static int sharedCount(int c) {
        return c >>> SHARED_SHIFT;
    }

    // 通过state计算写锁持有次数(重入次数)
    static int exclusiveCount(int c) {
        return c & EXCLUSIVE_MASK;
    }
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

接下来Sync类定义了用于实现维护当前线程读锁加锁次数的字段和类(HoldCounter)。 这个计数是当前线程持有的读锁的次数,是每个线程独有的。用于在fullTryAcquireShared方法中判断是否重入,重入的不能进入AQS等待。 为了实现这个计数,Sync增加了一个ThreadLocal的变量readHolds,同时因为很多场景大多数情况下锁冲突都是比较少的, 所以又增加了一个firstReader和firstReaderHoldCount来进行优化,如果当前线程是第一个加读锁的线程(把shared count从0变成1的线程),则使用firstReaderHoldCount作为getReadHoldCount返回。

同时要注意的是readHolds,cachedHoldCounter,firstReader,firstReaderHoldCount这几个字段都没有使用volatile声明,是因为

  1. readHolds是一个ThreadLocal变量,只需要保证创建完对象后的可见性即可,即该对象创建完成后就不会变化,ThreadLocal变量是作为Thread中的threadLocalMap的key使用的
  2. firstReader和firstReaderHoldCount用于优化在无竞争的情况下的readHoldCount实现,无竞争时使用这两个变量而不是用ThreadLocal,能够节省ThreadLocal的内存开销和查找消耗。这两个变量只用于当前线程是firstReader的情况,因此即使出现可见性问题,也不会影响当前线程,因为一个线程内是不会出现自己的可见性问题的,并且在release时当前线程会把firstReader设置为null。
  3. cachedHoldCounter同样是用于减少ThreadLocal调用的优化,如果最后一个acquire readLock成功的线程是下一个调用release的线程(类似一个栈),可以通过缓存最近使用的HoldCounter对象减少ThreadLocal调用,HoldCounter中保存了threadId,不需要volatile是因为线程只会使用属于自己线程的计数,如果cacheHoldCounter因为读取可见性问题,肯定不是当前线程的holdCounter,则会使用ThreadLocal兜底

在Sync的构造函数中,会在创建完readHolds这个字段的对象后,调用setState(getState),通过state的volatile写屏障,保证后面调用到readHolds时,都能读到readHolds的值,这是因为后续的cachedHoldCounter读取之前都会先读取state,state是volatile读,整体保证了happen before的关系。

    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    private transient ThreadLocalHoldCounter readHolds;

    private transient HoldCounter cachedHoldCounter;

    private transient Thread firstReader;
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# WriteLock加锁的tryAcquire实现

WriteLock加锁时,会调用Sync也就是AQS的acquire,acquire执行过程中 会有多个位置调用tryAcquire表示尝试加锁,传入的acquires参数是1.

tryAcquire的实现逻辑是

  1. 如果现在read count不为0,是不能获取成功的,因为读锁不能升级为写锁(防止死锁)
  2. 如果read count = 0,write count != 0, 且AQS独占持有线程不是当前线程,说明其他线程正在持有锁,当前线程无法加写锁,返回false。
  3. 如果read count = 0,write count != 0,AQS独占持有线程是当前线程,说明是写锁重入,要判断下重入次数,不能超过16bit的限制,没有超过则保存新state返回true
  4. 其他情况,read count = 0, write count = 0, 会先调用writerShouldBlock方法判断是否要阻塞(给公平锁实现留的逻辑),如果不阻塞通过cas修改state值, cas成功则表示acquire能成功、设置exclusiveOwnerThread,否则返回false。
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 获取state值
    int c = getState();
    // 从state中获取低16位的write count
    int w = exclusiveCount(c);
    if (c != 0) {
        // c != 0,说明write count和read count至少一个不等于0,如果w = 0,说明read count != 0,读锁不能升级为写锁,返回false
        // 如果w != 0,且当前线程不是exclusiveOwnerThread,说明其他线程在持有写锁,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 说明 write count != 0且当前线程等于exclusiveOwnerThread,则要判断writeCount加acquires后下是否溢出16bit的表示范围
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 没有溢出保存state,因为当前read count = 0, write count在低16位,所以直接加acquires就可以, 返回true
        setState(c + acquires);
        return true;
    }
    // 这时说明c = 0, write count = 0
    // writerShouldBlock是留给公平锁的实现,公平锁要判断当前AQS队列中是否有更靠前的Node,非公平锁直接返回false不需要谦让。
    if (writerShouldBlock() ||
        // 调用cas
        !compareAndSetState(c, c + acquires))
        // should block或cas失败都返回false表示尝试加锁失败
        return false;
    // cas成功,写入exclusiveOwnerThread
    setExclusiveOwnerThread(current);
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# WriteLock锁释放的tryRelease实现

WriteLock在释放锁调用unlock时,会调用Sync(也就是AQS)的release方法,release方法会调用tryRelease方法, tryRelease如果返回true,说明其他线程可以尝试tryAcquire了,会唤醒AQS的first节点(head的next,这时如果当前线程之前在队列中已经出队成为head)

tryRelease会给state减1(因为write count在低16位),如果减去后state为0,说明锁已经释放,其他线程可以加读锁或写锁。

protected final boolean tryRelease(int releases) {
    // 检查下exclusiveOwnerThread是否是当前线程
    if (!isHeldExclusively())
        // 不是抛出异常,说明没加锁成功就调用unlock
        throw new IllegalMonitorStateException();
    // 对state减去release得到要设置的state值,因为write count在低位,read count不变,所以直接减就可以
    int nextc = getState() - releases;
    // 如果nextc变成0,说明锁已经释放,其他的读锁或写锁的加锁请求都可以继续tryAcquire了
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        // 如果释放,设置exclusiveOwnerThread为null
        setExclusiveOwnerThread(null);
    // 设置state值,不需要cas是因为整个tryRelease是在独占锁的加锁保护下
    setState(nextc);
    return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# ReadLock加锁的tryAcquireShared实现

ReadLock的加锁lock方法会调用Sync(AQS)的acquireShare的方法,acquireShared方法执行过程中会调用tryAcquireShared, AQS中约定的tryAcquireShare方法的返回值含义为, 负数表示为获取成功,0表示当前线程获取成功,但是不会继续通知其他的shared mode节点来尝试tryAcquire,正数表示其他线程还可以tryAcquire,或通知下一个shared模式的节点来tryAcquire。

  1. 如果当前是其他线程加的写锁,返回-1
  2. 其他情况说明当前线程有资格加锁(要么没有加锁,要么现在加的是读锁),则会判断readerShouldBlock,公平锁会在readerShouldBlock判断前面是否有等待加write lock的Node节点。 然后会判断read count防止超过16bit,都没问题后通过cas修改state
  3. 如果readerShouldBlock返回true或read count超限或cas失败,都会调用fullTryAcquireShared做进一步判断。

cas修改成功之后,会为getReadHoldCount方法计算线程维度的读锁次数

如果当前线程把state中read count从0改成1,则设置firstReader和firstReaderHoldCount来计数; 如果不是0改成1,则判断是否和firstReader是一个线程,如果是复用firstReaderHoldCount计数; 不是firstReader的情况,则会使用ThreadLocal创建HoldCounter变量,并缓存到cachedHoldCounter字段中。

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // write 不等于0并且exclusiveOwnerThread不是当前线程,说明其他线程加了写锁,读锁不能加成功,返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 获取read count
    int r = sharedCount(c);
    // 如果不需要block,并且计数小于MAX_COUNT并且cas修改shared count成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果cas成功前shared count是0
        if (r == 0) {
            // 使用firstReader保存当前线程
            firstReader = current;
            // 使用firstReaderHoldCount保存当前线程的reader计数
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 如果shared count之前不是0,但是线程和firstReader是同一个,使用firstReaderHoldCount计数加加
            firstReaderHoldCount++;
        } else {
            // 读取一下HoldCounter缓存
            HoldCounter rh = cachedHoldCounter;
            // 如果缓存是null或者缓存的线程不是当前线程,则把当前线程的ThreadLocalHoldCounter的HoldCounter取出来,保存给cachedHoldCounter
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                // 如果当前线程threadLocalMap中没有readHolds,调用readHolds.get()时会initialValue初始化创建一个new HoldCounter
                cachedHoldCounter = rh = readHolds.get();
            // 如果cachedHoldCounter不是null并且线程也是当前线程,但是计数是0,则会把cachedHoldCounter设置到threadLocal中
            else if (rh.count == 0)
                readHolds.set(rh);
            // 给当前线程的HoldCounter这个ThreadLocal中保存的对象的count值++
            rh.count++;
        }
        // 返回1表示其他shared mode的Node还可以tryAcquireShared
        return 1;
    }
    // 其他情况调用fullTryAcquireShared进行进一步判断
    return fullTryAcquireShared(current);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

fullTryAcquireShared方法中,会不断重试

退出时机有

  1. 发现已经加了写锁,且线程不是当前线程,返回-1
  2. share count超出16bit,抛出异常
  3. readerShouldBlock返回true,要么是非公平锁模式下有exclusive node在first节点等待,要么是公平锁模式下有其他Node在AQS中排队等待,这两种情况如果当前线程不是重入,都会返回-1
  4. cas成功,返回1

其他情况都会继续for循环重试。其中维护HoldCounter的逻辑和tryAcquireShared中类似。

为什么读锁重入时不能到AQS队列中等待呢?想象一下现在一个线程已经加了读锁,然后再加一次读锁,如果这时因为各种原因tryReleaseShared返回-1,则最终会加入到AQS队列尾部, 假设AQS队列前面有一个等待加写锁的exclusive node,则出现了死锁,因为当前线程加不到读锁一直等待,等待有人唤醒AQS中的刚加入的shared模式的node,但是这个node在尾部,只有在前面没有exclusive node时才会唤醒, 但是exclusive node无法出队,因为有线程一直加着读锁,这个线程的死锁无法加锁,由此形成了死锁。

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    // 循环中重试,是因为cas失败情况下说明读锁出现并发,读与读之间是不需要阻塞的,不需要到AQS队列中等待,通过for循环等待下次cas即可,除非出现readerShouldBlock为true等情况
    for (;;) {
        // 获取state
        int c = getState();
        // 如果有写锁
        if (exclusiveCount(c) != 0) {
            // 如果不是当前线程加的写锁,返回-1
            if (getExclusiveOwnerThread() != current)
                return -1;
            // 如果是当前线程加的写锁,不能到AQS队列中等待,因为可能发生死锁(已经加了写锁,在等待加读锁,但是读锁放到AQS中等待,会一直不能被通知到),需要走到后面cas
        } else if (readerShouldBlock()) {
            // 没有加写锁的情况,判断readerShouldBlock,公平锁判断先到先得、非公平锁防止死锁饥饿
            if (firstReader == current) {
                // 如果firstReader是当前线程,说明current不为null,firstReaderHoldCount肯定大于0(因为release时如果count是0会设置current为null),说明是读锁重入,不能进入AQS队列中
            } else {
                // 其他情况,获取下holdCount
                if (rh == null) {
                    // 先读cachedHoldCounter
                    rh = cachedHoldCounter;
                    // 如果cachedHoldCounter是null或者不是当前线程的缓存,则从ThreadLocal中获取
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {
                        rh = readHolds.get();
                        // 如果count为0,从ThreadLocal中删除,减少内存
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 如果count为0,说明不是重入场景,返回-1,可以加到AQS队列中
                if (rh.count == 0)
                    return -1;
            }
        }
        // 判断share count将要超过16bit的情况,不能cas,抛出异常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 进行cas尝试
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // cas成功,更新hold count统计
            if (sharedCount(c) == 0) {
                // 如果share count之前是0,则设置当前线程为firstReader,初始化firstReaderHoldCount为1
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 当前线程是firstReader,给firstReaderHoldCount加1
                firstReaderHoldCount++;
            } else {
                // 其他情况,判断cachedHoldCounter是否可以使用
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    // 如果cachedHoldCounter为null或不是当前线程的缓存,则使用TheadLocal
                    rh = readHolds.get();
                // 如果cachedHoldCounter不是null且是当前线程的HoldCounter(tid相同),且count为0
                else if (rh.count == 0)
                    // 设置到ThreadLocal中
                    readHolds.set(rh);
                // HoldCounter对象的count加一
                rh.count++;
                // 更新cachedHoldCounter为当前使用的HoldCounter对象,在release方法中有概率使用到这个缓存,能减少ThreadLocal的调用
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
        // cas失败,继续for循环重试
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

# ReadLock释放锁tryReleaseShared实现

ReadLock在unlock时会调用Sync(AQS)的releaseShared方法,releaseShared方法会先调用tryReleaseShare, 如果返回true会调用signalNext唤醒first节点,注意的是如果firstNode节点是不需要唤醒的,因为如果first节点是shared Node则在acquire成功的时候就会调用signalNextIfShared,会把下一个first节点唤醒。 所以这里的tryReleaseShared只需要处理释放锁后,read count为0的时候唤醒exclude模式等待的node也就是等待加写锁的线程就可以了

tryReleaseShared方法前部分调整readCount计数,要给计数减一,计数为了优化可能存储在多个地方。 接下来会通过cas修改shared count,不过和WriteLock释放锁不同,ReadLock因为多个线程是可以同时对读锁进行加锁释放锁的,所以tryReleaseShared是可能并发执行的,要通过循环cas重试修改state值。

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 修改holdCount
    // 如果当前是firstReader,则数据保存在firstReaderHoldCount中
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        // 如果firstReaderHoldCount为1,说明firstReader线程已经全部释放锁了,下次加锁之前,当前线程都不再会使用firstReaderHoldCount,设置firstReader为null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            // 否则firstReaderHoldCount--
            firstReaderHoldCount--;
    } else {
        // 如果不是firstReader,则先从cachedHoldCounter读取
        HoldCounter rh = cachedHoldCounter;
        // 如果cachedHoldCounter没有数据或不是当前线程的缓存数据,则从ThreadLocal中读取
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            rh = readHolds.get();
        // 读取到HoldCounter对象后,获取计数
        int count = rh.count;
        if (count <= 1) {
            // 如果计数=1,说明释放锁后,对象可以释放,因为计数变成0了,调用readHolds.remove删除ThreadLocal(threadLocalMap中的key value)
            readHolds.remove();
            // 如果count <=0 说明unlock调用次数多了,比如加锁一次但是释放锁两次
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 其他情况,HoldCounter对象还需要保存,对count值减一
        --rh.count;
    }
    // 修改state值,因为读锁释放可能出现并发,所以要用cas并且失败循环重试
    for (;;) {
        // 获取当前state
        int c = getState();
        // 对share count减一,因为share count在state中的高位,所以要减去SHARE_UNIT
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 如果cas成功,则返回share count是否减到0,减到了0,说明读锁和写锁都已经释放,可以唤醒下一个exclusive模式的node。
            return nextc == 0;
        // cas失败则重试
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 公平锁与非公平锁

在获取锁的过程中读写锁分别调用了readerShouldBlock和writerShouldBlock方法,这两个方法是预留给实现公平锁的,以及防止非公平锁下写锁饥饿的。

# 非公平锁

非公平锁加写锁时,返回false,写锁总是尝试插队。对于加读锁,会谦让一下已经在AQS队列中first位置(head的next)排队的write lock,防止在读锁并发量远大于写锁时写锁一直加不到锁的饥饿问题。

final boolean writerShouldBlock() {
    // 写锁总是尝试插队
    return false; // writers can always barge
}
final boolean readerShouldBlock() {
    // 读锁调用apparentlyFirstQueuedIsExclusive
    return apparentlyFirstQueuedIsExclusive();
}
// 判断AQS队列中的first(head的next)是exclusive模式的node
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    // 如果有head且head有next且next不是shared node且waiter不为null,因为可能有并发出队,出队后旧的head对象会设置字段为null,所以这些判断都是需要的
    // 如果有写锁因为加不上锁等待在first节点,则这个判断会为true。
    return (h = head) != null && (s = h.next)  != null &&
        !(s instanceof SharedNode) && s.waiter != null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 公平锁

公平锁讲究先到先得,不插队,所以acquire时,和ReentrantLock一样,判断AQS中有没有靠前的节点,如果有,block方法则返回true,不过block即使返回true,在tryReleaseShared也会判断是否是重入场景避免死锁,在前面已经介绍。

static final class FairSync extends Sync {
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}
1
2
3
4
5
6
7
8