AQS之CountDownLatch源码解析
阅读数: 次
前言:
❝「CountDownLatch(倒计时器)」 是JDK并发包下的一个同步工具类,其内部是依赖于AQS(AbstractQueuedSynchronizer)的 共享锁(共享模式)。
注意: 建议阅读本文前,可以先看看这篇文章《AQS之ReentrantLock源码解析》,这篇文章中对AQS进行了简要介绍,有助于快速理解本文。
❞
本文主线:
CountDownLatch 应用场景 CountDownLatch 源码分析
应用场景:
❝针对于 CountDownLatch 倒计时器, 一种典型的场景就是类似于 「火箭发射」 ;在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检测,只有等到所有的检查完毕且没问题后,引擎才能点火。
那么在检测环节中多个检测项可以同时并发进行的,只有所有检测项全部完成后,才会通知引擎点火的,这里就可以使用 CountDownLatch 来实现。
CountDownLatch 到底是怎么实现的呢?别着急,「模拟代码」奉上:
❞
模拟代码:
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @PACKAGE_NAME: com.lyl.aqs
* @ClassName: SimulateRocketLaunchDemo
* @Description: 使用 CountDownLatch 模拟火箭发射过程
* @Date: 2020-05-31 14:17
**/
public class SimulateRocketLaunchDemo implements Runnable{
// 设置了 10 个检测项
static final CountDownLatch latch = new CountDownLatch(10);
static final SimulateRocketLaunchDemo demo = new SimulateRocketLaunchDemo();
@Override
public void run(){
// 模拟检查任务
try {
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println(Thread.currentThread().getName().split("-")[3]
+ " check complete !");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//计数减一
//放在finally避免任务执行过程出现异常,导致countDown()不能被执行
latch.countDown();
}
}
// test
public static void main(String[] args) throws InterruptedException {
// 设置线程数为10的固定线程池
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i=0; i<10; i++){
// 提交任务
exec.submit(demo);
}
// 等待检查,只有当10个检测项全部检测完成后,才会唤醒处于等待状态的main主线程,让其继续执行
latch.await();
// 发射火箭
System.out.println("Fire!");
// 关闭线程池
exec.shutdown();
}
}
❝再提供一个CountDownLatch 的实际应用的例子,「传送门」:https://blog.csdn.net/feichitianxia/article/details/97814645
❞
源码分析:
❝由于CountDownLatch 内部的实现是依赖于AQS的 「共享锁(共享模式)」 的,所以在阅读其源码前,需要对AQS的基础有一定的了解,否则源码看起来会相对困难很多;
如果对AQS一点也不了解的话,可以先阅读下 《AQS之ReentrantLock源码解析》 文章了解下AQS,这样在后面分析CountDownLatch 源码时会简单些;
❞
什么是共享锁、排它锁?
共享锁:允许多个线程可以同时获取一个锁; (CountDownLatch 使用的共享锁) 排它锁:一个锁在同一时刻只运行一个线程拥有;(ReentrantLock 使用的排它锁)
1、接下来主要分析CountDownLatch的这几个方法:
❝通过上图中的CountDownLatch 几个方法深入其源码进行分析;
❞
2、构造方法 new CountDownLatch(10) :
public CountDownLatch(int count) {
if (count < 0) {
throw new IllegalArgumentException("count < 0");
}
// CountDownLatch内部维护了Sync内部类,内部类继承了AQS父类
this.sync = new Sync(count);
}
①、接下来看看 Sync 类的构造方法:
Sync(int count) {
/**
* setState()方法是AQS提供的state变量的写方法, state变量被volatile修饰,由于volatile的
* happen-before规则,被 volatile 修饰的变量单独读写操作具有原子性
*/
setState(count);
}
然后在看看AQS提供的setState(int newCount) 方法 和 state变量:
/**
* The synchronization state.
*/
private volatile int state;
protected final void setState(int newState) {
state = newState;
}
3、CountDownLatch的 getCount( ) 方法:
public long getCount() {
// 调用 sync 内部类的getCount()方法
return sync.getCount();
}
①、Sync 内部类的getCount( ) 方法:
int getCount() {
// Sync 调用其父类AQS的 getState()方法
return getState();
}
AQS的getState()方法:
/**
* The synchronization state.
*/
private volatile int state;
protected final int getState() {
// 返回state同步状态值
return state;
}
4、CountDownLatch 的 countDown( ) 方法:
public void countDown() {
// 调用Sync内部类的父类AQS的 releaseShared()共享锁释放模版方法
sync.releaseShared(1);
}
①、AQS的 releaseShared( ) 方法:
public final boolean releaseShared(int arg) {
/**
* tryReleaseShared()方法是尝试释放锁,这个方法在AQS的子类Sync进行了重写
*/
if (tryReleaseShared(arg)) {
/**
* 如果tryReleaseShared()方法尝试释放锁成功,并且此时state同步状态变量值为0时,
* 则执行doReleaseShared方法,将在同步队列中阻塞的线程唤醒
*/
doReleaseShared();
return true;
}
return false;
}
②、CountDownLatch 的 tryReleaseShared( )方法:
protected boolean tryReleaseShared(int releases) {
// for(;;) 与 while(true) 一样的死循环
for (;;) {
// 获取state同步变量值
int c = getState();
// 如果state同步变量值已经是0,则返回false
if (c == 0)
return false;
// 将state同步变量值进行减一
int nextc = c-1;
// 使用AQS提供的CAS算法方法更新state变量值
if (compareAndSetState(c, nextc))
// 如果nextc等于0,代表此时state同步变量值为0了,返回true
return nextc == 0;
}
}
③、AQS提供的 doReleaseShared( ) 方法: 唤醒同步队列中阻塞的线程
「Node节点的四种状态值请参考文章:《AQS之ReentrantLock源码解析》」
private void doReleaseShared() {
for (;;) {
// head同步队列中的队列头
Node h = head;
if (h != null && h != tail) {
/**
* 获取head节点的状态,AQS中的Node内部节点类中定义了四种状态值
* 四种状态值请参考上面 ↑ 文章
*/
int ws = h.waitStatus;
/**
* SIGNAL是四中状态值之一:表示当前节点中的线程可以尝试被唤醒
*/
if (ws == Node.SIGNAL) {
// 将节点的状态使用CAS算法更新为0,0表示初始化状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 状态更新0失败,则进行下次循环
continue;
// 状态成功更新为0后,唤醒节点中的线程,此方法具体源码可参考上面 ↑ 文章
unparkSuccessor(h);
}
/**
* 如果节点状态值为0,则使用CAS方法更新节点状态值为 Node.PROPAGATE
* PROPAGATE 是四中状态值之一:该状态表示可运行,只在共享模式下使用
*/
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
// 跳出循环
break;
}
}
5、CountDownLatch 的 await( ) 方法:
❝await( ) 方法:
❞
当state状态变量值不为0时,就一直将线程(main主线程)阻塞在同步队列中; 当state变量值为0时,会尝试将线程唤醒,并将唤醒操作传播下去。
public void await() throws InterruptedException {
// 调用Sync内部类的父类AQS的模版方法 acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
①、AQS的模版方法 acquireSharedInterruptibly(1) 方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
/**
* interrupted()判断当前线程是否被中断,注意:此方法会默认清除线程的中断标志
*/
if (Thread.interrupted())
throw new InterruptedException();
/**
* tryAcquireShared()尝试访问共享锁,如果state同步状态变量值不为0,则返回-1
*/
if (tryAcquireShared(arg) < 0)
/**
* 将阻塞的线程创建Node节点,绑定节点类型为共享模式,并将创建的节点加入同步队列的队尾
* 并且当新创建的Node节点的前驱结点为head时,就会尝试唤醒下一个节点中的线程
*/
doAcquireSharedInterruptibly(arg);
}
②、AQS提供的 doAcquireSharedInterruptibly( ) 方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建新Node节点,绑定共享模式,并将其插入到队尾
final Node node = addWaiter(Node.SHARED);
// failed是中断标志位
boolean failed = true;
try {
for (;;) {
// 返回当前节点的前驱结点
final Node p = node.predecessor();
if (p == head) {
// 判断当前state同步变量值是否为0,不是0返回-1,是0返回1
int r = tryAcquireShared(arg);
// 如果 r大于0,表示state变量值为0
if (r >= 0) {
// 将当前节点设置head队列头,并且尝试唤醒同步队列中阻塞的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
/**
* shouldParkAfterFailedAcquire()是对当前节点的前驱结点的状态进行判断,以及去针对各种
* 状态做出相应处理,由于文章篇幅问题,具体源码本文不做讲解;只需知道如果前驱结点p的状态为
* SIGNAL的话,就返回true。
*
* parkAndCheckInterrupt()方法会使当前线程进去waiting状态,并且查看当前线程是否被中断,
* interrupted() 同时会将中断标志清除。
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
/**
* 如果for(;;)循环中出现异常,并且failed=false没有执行的话,cancelAcquire方法
* 就会将当前线程的状态置为 node.CANCELLED 已取消状态,并且将当前节点node移出
* 同步队列。
*/
cancelAcquire(node);
}
}
③、AQS提供的 setHeadAndPropagate( ) 方法:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置为队首
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果s节点是共享模式的,则调用doReleaseShared()方法
if (s == null || s.isShared())
// 唤醒阻塞在同步队列中的线程
doReleaseShared();
}
}
❝-- END --
本文解析 CountDownLatch 源码已经写完了,如果大家在看的时候,有些地方没看明白的话,请务必先将这篇文章 《AQS之ReentrantLock源码解析》 阅读下;
这篇文章中简单讲解了 AQS的原理,并且着重讲解了独占模式(排它锁)的 ReentrantLock,可以先将这篇文章阅读下,再来看 CountDownLatch 就会感觉简单些,逻辑也更加清晰些。
❞
❤ 点赞 + 评论 + 转发 哟
如果本文对您有帮助的话,请挥动下您爱发财的小手点下赞呀,您的支持就是我不断创作的动力,谢谢!
您可以微信搜索【木子雷】公众号,大量Java学习干货文章,您可以来瞧一瞧哟!
- 文章连结: https://leishen6.github.io/2020/05/31/AQS_CountDownLatch_source_code_analysis/
- 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明出处!