DXP BLOG
首页
  • JDk
  • Spring系列
  • 微服务
  • Json
  • Netty
  • Bug
  • Mysql
  • Postgresql
  • 达梦
  • activemq
  • rabbitmq
  • rocketmq
  • redis
  • Vue
  • React
  • Angular
  • Javascript
  • Typescript
linux
  • 协议
  • 加解密
  • 分类
  • 标签
  • 归档
Gitee (opens new window)

董新平

一个普普通通的代码程序猿
首页
  • JDk
  • Spring系列
  • 微服务
  • Json
  • Netty
  • Bug
  • Mysql
  • Postgresql
  • 达梦
  • activemq
  • rabbitmq
  • rocketmq
  • redis
  • Vue
  • React
  • Angular
  • Javascript
  • Typescript
linux
  • 协议
  • 加解密
  • 分类
  • 标签
  • 归档
Gitee (opens new window)
  • JDK

    • ThreadPoolExecutor
    • Object类12中方法及作用
    • CountDownLatch
      • 构造函数
      • 重点方法 countDown await
        • countDown
        • await
      • 使用案例
    • 读写锁
    • Atomic
  • Spring系列

  • Json

  • JAVA
  • JDK
dongxinping
2022-08-24
目录

CountDownLatch

# CountDownLatch

jdk17

允许一个或者多个线程去等待其他线程完成操作.

# 构造函数

   public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
1
2
3
4

CountDownLatch实例化时,其实是实例化了一个内部的 Sync 对象, 并将形参透传给Sync, Sync 其实是一个 AQS 子类

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
          // 设置同步状态的值
            setState(count);
        }

        int getCount() {
            // 设置同步状态的值
            return getState();
        }

        // 加共享锁   尝试获取同步, 且状态值为0比欧式成功,否则失败. 
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        // 释放加共享锁  尝试释放同步状态,
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                // 状态为0, 直接放回
                if (c == 0)
                    return false;
                // 通过cas将同步状态值减1
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 重点方法 countDown await

countDown await 是 CountDownLatch 类中最重要的两个方法,一个是将同步状态值减1, 一个则是阻塞等待.

# countDown

内部实质上是调用 Sync 的 tryReleaseShared(1), 将状态值减1

    /**  CountDownLatch */
   public void countDown() {
        // 调动 AQS 的 releaseShared 方法
        sync.releaseShared(1);
    }

    /**  AbstractQueuedSynchronizer */
    public final boolean releaseShared(int arg) {
      // 调用子类的 CountDownLatch.Sync 自旋减同步状态值
      if (tryReleaseShared(arg)) {
          signalNext(head);
          return true;
      }
      return false;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# await

阻塞线程,等待状态值变为0

  /**  CountDownLatch */
  public void await() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
  }

  /**  AbstractQueuedSynchronizer */
  public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  
  // 中断了则直接抛出 InterruptedException 异常
  if (Thread.interrupted() ||

      // 尝试 `Sync` 类中重写的 `tryAcquireShared` 尝试加锁, 
      // 如果加锁失败, 调用 acquire
      // 加锁成功,直接放回,结束阻塞状态
      (tryAcquireShared(arg) < 0 &&

        // 阻塞当前线程
        acquire(null, arg, true, true, false, 0L) < 0))
      throw new InterruptedException();
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 使用案例

  1. 创建一个线程池,用于执行并发任务
  2. 声明一个 CountDownLatch, 大小为任务的数量(10)
  3. 往线程池中丢10个任务
  4. 主线程通过 await 阻塞等待任务执行完毕
  5. 关闭线程池,结束程序
import java.time.LocalDateTime;
import java.util.concurrent.*;

public class Test {

    static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 3,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) throws InterruptedException {
        final int size = 10;

        final CountDownLatch latch = new CountDownLatch(size);
        System.out.println("处理开始");

        for (int i = 0; i < size; i++) {
            executor.execute(() -> {
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + "---" + LocalDateTime.now());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            });
        };

        latch.await();
        System.out.println("处理完成");
        executor.shutdownNow();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

从输出的结果可以看出, 任务是并发执行的, 处理完成 这个业务会等待 latch.await()的阻塞状态结束

处理开始
pool-1-thread-7---2022-08-25T14:30:21.750001
pool-1-thread-5---2022-08-25T14:30:21.750052
pool-1-thread-3---2022-08-25T14:30:21.749940
pool-1-thread-10---2022-08-25T14:30:21.749927
pool-1-thread-9---2022-08-25T14:30:21.750023
pool-1-thread-4---2022-08-25T14:30:21.750008
pool-1-thread-8---2022-08-25T14:30:21.749924
pool-1-thread-1---2022-08-25T14:30:21.750070
pool-1-thread-6---2022-08-25T14:30:21.750029
pool-1-thread-2---2022-08-25T14:30:21.749884
处理完成
1
2
3
4
5
6
7
8
9
10
11
12
#源码阅读
上次更新: 2023/06/12, 10:31:06
Object类12中方法及作用
读写锁

← Object类12中方法及作用 读写锁→

最近更新
01
Redis数据类型
01-20
02
Atomic
12-27
03
编译安装Redis
12-27
更多文章>
dongxinping | Copyright © 2022-2024 Dongxinping | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式