# CountDownLatch
CountDownLatch是一种倒计时的latch锁,Latch包含一个初始的数字,线程可以通过await等待Latch打开, 其他线程可以通过countDown给Latch减数字,减到0时,所有等待await的线程会唤醒返回。
适用场景: 等待其他线程或多个线程的执行完成,比如现在线程A要等待线程B执行完成后才执行某些代码,则可以创建一个共享的CountDownLatch对象,count为1, 然后线程A调用CountDownLatch的await,B线程在执行完成后调用CountDownLatch的countDown,就能实现线程AB之间的先后同步协调。
例如如下代码,虽然线程A先start,但是会等待CountDownLatch,线程B后start,sleep1秒后调用countDown,使得当前count剩余1的CountDownLatch释放,A线程从await()中返回继续执行。
final CountDownLatch threadAStartLatch = new CountDownLatch(1);
Thread threadA = new Thread(() -> {
try {
threadAStartLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread A running");
});
threadA.start();
Thread threadB = new Thread(() -> {
System.out.println("Thread B running ");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread B Finished");
threadAStartLatch.countDown();
});
threadB.start();
threadA.join();
threadB.join();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 实现原理
CountDownLatch基于AQS实现,使用state作为count的计数,CountDownLatch的await使用AQS的acquireShared, countDown使用AQS的releaseShared。 使用shared模式是因为await和countDown都支持多个线程同时await成功。
# Sync实现
Sync类是CountDownLatch的AQS子类,实现了tryAcquireShared和tryReleaseShare方法,用state存储剩余的count数。 tryAcquireShared只有在state为0的时候才能成功返回1,其他情况返回-1。 tryReleaseShare通过cas重试给state减一,减到0时可以通知其他线程尝试加锁。
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CountDownLatch在构造函数中传入count数构造Sync类对象
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
2
3
4
5
6
# await实现
await调用AQS(Sync)的acquireSharedInterruptibly方法,传入的参数是1。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
2
3
# countDown实现
countDown方法调用AQS(Sync)的releaseShared方法降低count数。
public void countDown() {
sync.releaseShared(1);
}
2
3