前言:

CountDownLatch(倒计时器) 是JDK并发包下的一个同步工具类,其内部是依赖于AQS(AbstractQueuedSynchronizer)的 共享锁(共享模式)。

注意: 建议阅读本文前,可以先看看这篇文章《AQS之ReentrantLock源码解析》,这篇文章中对AQS进行了简要介绍,有助于快速理解本文。

本文主线:

  • CountDownLatch 应用场景
  • CountDownLatch 源码分析

应用场景:

针对于 CountDownLatch 倒计时器, 一种典型的场景就是类似于 火箭发射 ;在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检测,只有等到所有的检查完毕且没问题后,引擎才能点火。

那么在检测环节中多个检测项可以同时并发进行的,只有所有检测项全部完成后,才会通知引擎点火的,这里就可以使用 CountDownLatch 来实现。

CountDownLatch 到底是怎么实现的呢?别着急,模拟代码奉上:

模拟代码:

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @PACKAGE_NAME: com.lyl.aqs
 * @ClassName: SimulateRocketLaunchDemo
 * @Description:  使用 CountDownLatch 模拟火箭发射过程
 * @Date: 2020-05-31 14:17
 **/

public class SimulateRocketLaunchDemo implements Runnable{

    // 设置了 10 个检测项
    static final CountDownLatch latch = new CountDownLatch(10);
    static final SimulateRocketLaunchDemo demo = new SimulateRocketLaunchDemo();

    @Override
    public void run(){
        // 模拟检查任务
        try {
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println(Thread.currentThread().getName().split("-")[3]
                    + " check complete !");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //计数减一
            //放在finally避免任务执行过程出现异常,导致countDown()不能被执行
            latch.countDown();
        }
    }

    // test
    public static void main(String[] args) throws InterruptedException {
        // 设置线程数为10的固定线程池
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i=0; i<10; i++){
            // 提交任务
            exec.submit(demo);
        }

        // 等待检查,只有当10个检测项全部检测完成后,才会唤醒处于等待状态的main主线程,让其继续执行
        latch.await();
        // 发射火箭
        System.out.println("Fire!");
        // 关闭线程池
        exec.shutdown();
    }
}

再提供一个CountDownLatch 的实际应用的例子,传送门:https://blog.csdn.net/feichitianxia/article/details/97814645

源码分析:

由于CountDownLatch 内部的实现是依赖于AQS的 共享锁(共享模式) 的,所以在阅读其源码前,需要对AQS的基础有一定的了解,否则源码看起来会相对困难很多;

如果对AQS一点也不了解的话,可以先阅读下 《AQS之ReentrantLock源码解析》 文章了解下AQS,这样在后面分析CountDownLatch 源码时会简单些;

什么是共享锁、排它锁?

  • 共享锁:允许多个线程可以同时获取一个锁; (CountDownLatch 使用的共享锁)
  • 排它锁:一个锁在同一时刻只运行一个线程拥有;(ReentrantLock 使用的排它锁)

1、接下来主要分析CountDownLatch的这几个方法:

通过上图中的CountDownLatch 几个方法深入其源码进行分析;

2、构造方法 new CountDownLatch(10) :

public CountDownLatch(int count) {
    if (count < 0) {
        throw new IllegalArgumentException("count < 0");
    }
    // CountDownLatch内部维护了Sync内部类,内部类继承了AQS父类
    this.sync = new Sync(count);
}

①、接下来看看 Sync 类的构造方法:

Sync(int count) {
    /**
     * setState()方法是AQS提供的state变量的写方法, state变量被volatile修饰,由于volatile的
     * happen-before规则,被 volatile 修饰的变量单独读写操作具有原子性
     */

    setState(count);
}

然后在看看AQS提供的setState(int newCount) 方法 和 state变量:

/**
 * The synchronization state.
 */

private volatile int state;

protected final void setState(int newState) {
    state = newState;
}

3、CountDownLatch的 getCount( ) 方法:

public long getCount() {
    // 调用 sync 内部类的getCount()方法
    return sync.getCount();
}

①、Sync 内部类的getCount( ) 方法:

int getCount() {
    // Sync 调用其父类AQS的 getState()方法
    return getState();
}

AQS的getState()方法:

/**
 * The synchronization state.
 */

private volatile int state;

protected final int getState() {
    // 返回state同步状态值
    return state;
}

4、CountDownLatch 的 countDown( ) 方法:

public void countDown() {
    // 调用Sync内部类的父类AQS的 releaseShared()共享锁释放模版方法
    sync.releaseShared(1);
}

①、AQS的 releaseShared( ) 方法:

public final boolean releaseShared(int arg) {
    /**
     * tryReleaseShared()方法是尝试释放锁,这个方法在AQS的子类Sync进行了重写
     */

    if (tryReleaseShared(arg)) {
        /**
         * 如果tryReleaseShared()方法尝试释放锁成功,并且此时state同步状态变量值为0时,
         * 则执行doReleaseShared方法,将在同步队列中阻塞的线程唤醒
         */

        doReleaseShared();
        return true;
    }
    return false;
}

②、CountDownLatch 的 tryReleaseShared( )方法:

protected boolean tryReleaseShared(int releases) {
    // for(;;) 与 while(true) 一样的死循环
    for (;;) {
        // 获取state同步变量值
        int c = getState();
        // 如果state同步变量值已经是0,则返回false
        if (c == 0)
            return false;
        // 将state同步变量值进行减一
        int nextc = c-1;
        // 使用AQS提供的CAS算法方法更新state变量值
        if (compareAndSetState(c, nextc))
            // 如果nextc等于0,代表此时state同步变量值为0了,返回true
            return nextc == 0;
    }
}

③、AQS提供的 doReleaseShared( ) 方法: 唤醒同步队列中阻塞的线程

Node节点的四种状态值请参考文章:《AQS之ReentrantLock源码解析》

private void doReleaseShared() {
        for (;;) {
            // head同步队列中的队列头
            Node h = head;
            if (h != null && h != tail) {
                /**
                 * 获取head节点的状态,AQS中的Node内部节点类中定义了四种状态值
                 * 四种状态值请参考上面 ↑ 文章
                 */

                int ws = h.waitStatus;
                /**
                 * SIGNAL是四中状态值之一:表示当前节点中的线程可以尝试被唤醒 
                 */

                if (ws == Node.SIGNAL) {
                    // 将节点的状态使用CAS算法更新为0,0表示初始化状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        // 状态更新0失败,则进行下次循环
                        continue;     
                    // 状态成功更新为0后,唤醒节点中的线程,此方法具体源码可参考上面 ↑ 文章
                    unparkSuccessor(h);
                }
                /**
                 * 如果节点状态值为0,则使用CAS方法更新节点状态值为 Node.PROPAGATE
                 * PROPAGATE 是四中状态值之一:该状态表示可运行,只在共享模式下使用
                 */

                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)  
                // 跳出循环
                break;
        }
    }

5、CountDownLatch 的 await( ) 方法:

await( ) 方法:

  • 当state状态变量值不为0时,就一直将线程(main主线程)阻塞在同步队列中;
  • 当state变量值为0时,会尝试将线程唤醒,并将唤醒操作传播下去。
public void await() throws InterruptedException {
    // 调用Sync内部类的父类AQS的模版方法 acquireSharedInterruptibly()方法
    sync.acquireSharedInterruptibly(1);
}

①、AQS的模版方法 acquireSharedInterruptibly(1) 方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException 
{
    /**
     * interrupted()判断当前线程是否被中断,注意:此方法会默认清除线程的中断标志
     */

    if (Thread.interrupted())
        throw new InterruptedException();
    /**
     * tryAcquireShared()尝试访问共享锁,如果state同步状态变量值不为0,则返回-1
     */

    if (tryAcquireShared(arg) < 0)
        /**
         * 将阻塞的线程创建Node节点,绑定节点类型为共享模式,并将创建的节点加入同步队列的队尾
         * 并且当新创建的Node节点的前驱结点为head时,就会尝试唤醒下一个节点中的线程
         */

        doAcquireSharedInterruptibly(arg);
}

②、AQS提供的 doAcquireSharedInterruptibly( ) 方法:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException 
{
    // 创建新Node节点,绑定共享模式,并将其插入到队尾
    final Node node = addWaiter(Node.SHARED);
    // failed是中断标志位
    boolean failed = true;
    try {
        for (;;) {
            // 返回当前节点的前驱结点
            final Node p = node.predecessor();
            if (p == head) {
                // 判断当前state同步变量值是否为0,不是0返回-1,是0返回1
                int r = tryAcquireShared(arg);
                // 如果 r大于0,表示state变量值为0
                if (r >= 0) {
                    // 将当前节点设置head队列头,并且尝试唤醒同步队列中阻塞的线程
                    setHeadAndPropagate(node, r);
                    p.next = null// help GC
                    failed = false;
                    return;
                }
            }
            /**
             * shouldParkAfterFailedAcquire()是对当前节点的前驱结点的状态进行判断,以及去针对各种
             * 状态做出相应处理,由于文章篇幅问题,具体源码本文不做讲解;只需知道如果前驱结点p的状态为
             * SIGNAL的话,就返回true。
             *
             * parkAndCheckInterrupt()方法会使当前线程进去waiting状态,并且查看当前线程是否被中断,
             * interrupted() 同时会将中断标志清除。
             */

            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            /**
             * 如果for(;;)循环中出现异常,并且failed=false没有执行的话,cancelAcquire方法
             * 就会将当前线程的状态置为 node.CANCELLED 已取消状态,并且将当前节点node移出
             * 同步队列。
             */

            cancelAcquire(node);
    }
}

③、AQS提供的 setHeadAndPropagate( ) 方法:

 private void setHeadAndPropagate(Node node, int propagate) {
     Node h = head; 
     // 设置为队首
     setHead(node);
     
     if (propagate > 0 || h == null || h.waitStatus < 0 ||
         (h = head) == null || h.waitStatus < 0) {
         Node s = node.next;
         // 如果s节点是共享模式的,则调用doReleaseShared()方法
         if (s == null || s.isShared())
             // 唤醒阻塞在同步队列中的线程
             doReleaseShared();
     }
 }

-- END --

本文解析 CountDownLatch 源码已经写完了,如果大家在看的时候,有些地方没看明白的话,请务必先将这篇文章 《AQS之ReentrantLock源码解析》 阅读下;

这篇文章中简单讲解了 AQS的原理,并且着重讲解了独占模式(排它锁)的 ReentrantLock,可以先将这篇文章阅读下,再来看 CountDownLatch 就会感觉简单些,逻辑也更加清晰些。

点赞 + 评论 + 转发 哟

如果本文对您有帮助的话,请挥动下您爱发财的小手点下赞呀,您的支持就是我不断创作的动力,谢谢!

您可以微信搜索【木子雷】公众号,大量Java学习干货文章,您可以来瞧一瞧哟!