# ReentrantLock
# ReentrantLock介绍
ReentrantLock是java util concurrent包中提供的一个锁实现,相对于synchronized关键字锁, ReentrantLock提供了具备超时时间的tryLock、可中断的lockInterruptibly,并且能够通过newCondition创建多个等待条件。 ReentrantLock能够保证和synchronized一致的内存模型规则(happen before)。 ReentrantLock中的Reentrant的意思是可重入,也就是一个线程对一个锁加锁后还可以在锁中再加这个锁。
# ReentrantLock使用
# 加锁
ReentrantLock实现了Lock接口。 lock的常见使用方式如下,创建完ReentrantLock后,可以对lock对象调用lock方法加锁, 使用时要注意的是要注意将加锁后的代码加上try finally代码块,并在finally中调用lock.unlock()保证最后锁能unlock释放。
lock加锁代码中的逻辑能独占运行,和其他要加锁的代码间,保证原子性、可见性和防止重排序等问题。默认的构造函数创建的是非公平锁(有可能出现插队问题但是整体的吞吐量更高),如果要创建 公平锁,可以通过构造函数ReentrantLock(boolean fair)传入true。
除了基本的lock方法,Lock接口中还提供了其他的加锁方法。
- tryLock: 尝试一次加锁,如果加锁成功返回true可以继续执行代码,注意unlock。否则返回false,表示加锁失败。
- lockInterruptibly: 和lock不同,lockInterruptibly可以响应中断请求,当加锁等待过程中被其他线程中断,lockInterruptibly会抛出InterruptedException
- boolean tryLock(long time, TimeUnit unit): 带有超时等待时间的加锁,如果超过这个时间还没加到锁,会返回false,否则为true。如果等待过程中被中断,抛出InterruptedException
final ReentrantLock lock = new ReentrantLock();
public void m() {
assert lock.getHoldCount() == 0;
lock.lock();
try {
// ... method body
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
# 条件等待
Lock能够创建Condition对象,用来实现类似wait/notify/notifyAll的条件等待通知功能。
Condition对象的等待方法是await(),通知方法是signal/signalAll(注意不要和wait/notify用混,因为wait/notify是每个对象都具有的方法。) 调用await和signal方法需要保证当前在和Condition相同的Lock加锁的前提下,否则会抛出IllegalMonitorStateException异常。 并且条件的状态的读取修改都要在相同的锁的保护下,lock加锁后要在一个while循环中判断条件,不满足条件调用await等待。
可以参考ArrayBlockingQueue中对条件等待通知的经典应用。
ReentrantLock lock = new ReentrantLock()
Condition notFull = lock.newCondition();
public void m() throws InterruptedException {
lock.lock();
try {
while(notFull条件不满足) {
notFull.await();
}
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
# ReentrantLock实现原理
ReentrantLock基于AQS实现(AQS的实现原理参考另一篇AQS文章)。
ReentrantLock的基本实现策略是将AQS的state等于0作为未加锁状态,大于0作为加锁状态,因为ReentrantLock要实现可重入 所以state记录当前锁重入的次数,加锁state+1,释放锁state-1。 当state从1减到0时,说明锁已经release,就会触发AQS中的signalNext机制唤醒可能在AQS等待队列中等待的其他尝试加锁线程。
为了实现非公平锁和公平锁,ReentrantLock类定义了两个AQS的子类,区别是在tryAcquire方法时是否允许插队, 公平锁是不能插队的,所以在执行前要判断当前AQS等待队列中没有有效等待的Node节点。
公平锁和非公平锁因为有很多逻辑相同,所以定义了一个共同的父类Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* tryLock是非公平锁的尝试加锁的实现,如果当前锁没有人使用,则尝试加锁,加锁成功返回true,否则返回false,没有加锁不会阻塞线程。
*/
@ReservedStackAccess
final boolean tryLock() {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取当前的stat
int c = getState();
// 如果state为0,则尝试通过cas修改state来实现加锁
if (c == 0) {
// 使用AQS的compareAndSetState修改state状态为1
if (compareAndSetState(0, 1)) {
// 修改成功说明当前线程获得了锁,设置exclusiveOwnerThread,在后面判断重入时有用
setExclusiveOwnerThread(current);
return true;
}
}
// 如果state不等于0,则判断下是否是重入,即持有锁的线程和当前线程是否是同一个
else if (getExclusiveOwnerThread() == current) {
// 如果是,增加state的值,这里不需要使用cas,是因为在加锁中,其他线程无法对state进行修改,没有原子性问题
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 修改state
setState(c);
return true;
}
// 否则state != 0且已经加锁的不是当前线程,返回false表示尝试加锁失败
return false;
}
/**
* Checks for reentrancy and acquires if lock immediately
* available under fair vs nonfair rules. Locking methods
* perform initialTryLock check before relaying to
* corresponding AQS acquire methods.
*/
/**
* 一次尝试加锁,公平锁和非公平锁提供不同的实现。在lock方法(也包括lockInterruptibly等)中会先调用initialTryLock,如果false才会调用AQS的acquire
* 这个不同于tryAcquire是因为tryAcquire在很多情况下都会调用,而initialTryLock明确在调用acquire之前调用,对于公平锁和tryAcquire有区别,后面会讲到
*/
abstract boolean initialTryLock();
@ReservedStackAccess
final void lock() {
// 先第一次initialTryLock
if (!initialTryLock())
// initialTryLock不成功再acquire
acquire(1);
}
@ReservedStackAccess
final void lockInterruptibly() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!initialTryLock())
acquireInterruptibly(1);
}
@ReservedStackAccess
final boolean tryLockNanos(long nanos) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return initialTryLock() || tryAcquireNanos(1, nanos);
}
// 定义公平锁和非公平锁一致的tryRelease释放方法, ReentrantLock的unlock方法会调用AQS的release(1),然后会调用到这里的tryRelease方法,参数是1。
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 获取当前的state,计算减去之后的state值
int c = getState() - releases;
// 判断下当前线程是否已经加了这个锁,如果没有抛出异常
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
// state减到0时,会释放整个锁,需要清除exclusiveOwnerThread
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
// 修改state值,整个方法不需要考虑原子性问题,因为是还在加锁状态下进行的,执行过程中,其他线程无法修改state值。
setState(c);
return free;
}
// 判断同步器是否是被当前线程正独占持有
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 创建ConditionObject的方法
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# 非公平锁实现
先看下非公平锁的实现,非公平锁不需要考虑先到后到的问题,只需要在state为0的情况下cas就可以了,并且在第一次尝试获取锁时判断下锁的线程是否是当前线程,如果是进行重入。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// initialTryLock实现,在lock方法中xian调用initialTryLock,不成功调用tryAcquire
final boolean initialTryLock() {
// 获取当前线程
Thread current = Thread.currentThread();
// 先cas,从0到1
if (compareAndSetState(0, 1)) { // first attempt is unguarded
// 如果成功,加锁成功,修改exclusiveOwnerThread
setExclusiveOwnerThread(current);
return true;
}
// cas失败判断下线程是否同一个
else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
// cas失败并且也不是重入场景,返回false
return false;
}
/**
* tryAcquire方法,会在调用完initialTryLock(返回false)之后调用acquire方法,在acquire方法中调用
* 走到这里,肯定能够判断不是重入的场景了,所以不需要在判断exclusiveOwnerThread是否是当前线程,能够提升性能
*/
protected final boolean tryAcquire(int acquires) {
// 判断state为0的前提下进行cas从0到1
if (getState() == 0 && compareAndSetState(0, acquires)) {
// 如果成功,修改exclusiveOwnerThread为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 公平锁实现
公平锁要保证在队列中的Node优先加锁的原则。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* lock方法中第一次调用,此时当前线程没有加入到AQS等待队列中,所以判断优先级只需要判断AQS队列中是否有有效的Node即可
*/
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 获取当前state
int c = getState();
// state为0时尝试加锁
if (c == 0) {
// 判断AQS队列是否有等待的线程,如果没有通过cas尝试修改state状态
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
// 获取成功设置exclusiveOwnerThread
setExclusiveOwnerThread(current);
return true;
}
}
// 判断是否是相同线程,重入的情况
else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
/**
* initialTryLock失败后,lock方法会调用acquire,acquire中多个时机都会调用tryAcquire
*/
protected final boolean tryAcquire(int acquires) {
// tryAcquire先判断state为0,还需要判断队列中是否有比当前Node更靠前的Node,因为可能有其他线程在当前线程之前把Node加入到AQS队列中了
// 在判断AQS队列中当前没有更靠前的Node后,尝试cas
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 成功后修改exclusiveOwnerThread
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
AQS中的hasQueuedThreads()方法实现,从tail向前遍历(原因在AQS文章中进行了详细分析),如果找到一个status >=0的Node(非head),说明队列中有有效节点返回true,否则返回false
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
if (p.status >= 0)
return true;
return false;
}
2
3
4
5
6
AQS中的hasQueuedPredecessors()方法返回AQS队列中是否有比当前线程更靠前的节点。
实现方案为尝试获取first节点(head的next),判断first节点的线程是否和当前线程相同。
从head开始获取first节点更加快速(不需要像tail那样遍历)但是head节点判断可能会因为头结点更换等因素影响数据一致性。 这些情况下都需要从tail向前遍历查找,因为如果当前线程在队列中,从tail向前一定能找到当前的线程,并且当前线程不会从队列中被移出(没有从head开始查找的问题)
- 如果head为null,说明现在队列还没有创建,返回false(此时first是null)
- 如果head不是null,head.next是null,说明队列已经初始化完成,但是可能这个head已经出队不再是最新的head
- 如果head.next不是null,说明队列中有节点了,但是s.waiter是null,说明有其他线程正在修改head,s从first节点变成head了
- 如果s.waiter不是null,但是s.prev是null,说明s变成了新的head节点,first节点也发生了变化。
前面这些情况下,都需要再从tail向前遍历一遍(FIXME WHY),找到first节点,通过调用getFirstQueuedThread()获得first节点的线程, 并和当前线程比较。
public final boolean hasQueuedPredecessors() {
Thread first = null; Node h, s;
if ((h = head) != null && ((s = h.next) == null ||
(first = s.waiter) == null ||
s.prev == null))
first = getFirstQueuedThread(); // retry via getFirstQueuedThread
return first != null && first != Thread.currentThread();
}
2
3
4
5
6
7
8
public final Thread getFirstQueuedThread() {
Thread first = null, w; Node h, s;
// if判断还是和hasQueuedPredecessors中的一样
if ((h = head) != null && ((s = h.next) == null ||
(first = s.waiter) == null ||
s.prev == null)) {
// 从tail向前查找,找到最后一个waiter不是null的节点,将这个waiter返回
// traverse from tail on stale reads
for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
if ((w = p.waiter) != null)
first = w;
}
return first;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# jdk17相对jdk8版本的变化
# 不区分第一次尝试加锁和之后的尝试加锁
在jdk8中,lock方法调用acquire,acquire中的每次调用tryAcquire都需要判断是否重入、公平锁是否有靠前的节点。
jdk17版本的实现中区分了第一次和之后的尝试加锁,可以针对各自进行优化,比如之后的tryAcquire不再判断exclusiveOwnerThread是否是自己等。
# hasQueuedThreads变化
jdk8中的hasQueuedThreads只判断了 head != tail
,而jdk17中还额外对节点的status进行了判断过滤掉CANCELLED节点。
jdk8版本
public final boolean hasQueuedThreads() {
return head != tail;
}
2
3
jdk17版本
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
if (p.status >= 0)
return true;
return false;
}
2
3
4
5
6
# hasQueuedPredecessors()方法变化
jdk17版本的hasQueuedPredecessors方法相比下面Jdk8版本的更加严谨,对更多的并发情况进行了判断处理。
比如下面jdk8代码中,如果在执行 h != t时,当前线程的Node是first.next,在执行到s = h.next时,旧的head已经出队, 所以head.next返回null,则(s = h.next) == null会返回true,此时其实当前线程已经是first节点了,hasQueuedPredecessor并不准确。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
2
3
4
5
6
7
8
9
10