跳至主要內容

JUC 八股20 - 并发工具类

codejavajuc八股约 1515 字大约 5 分钟

并发工具类

作用
Semaphore限制线程的数量
Exchanger两个线程交换数据
CountDownLatch线程等待直到计数器减为 0 时开始工作
CyclicBarrier作用跟 CountDownLatch 类似,但是可以重复使用
Phaser增强的 CyclicBarrier

CountDownLatch

CountDownLatch 是 JUC 中的一个同步工具类,用于协调多个线程之间的同步,确保主线程在多个子线程完成任务后继续执行

它的核心思想是通过一个倒计时计数器来控制多个线程的执行顺序

核心思想

利用共享锁实现,内部同样实现了一个内部类继承 AQS, 在一开始的时候就是已经上了count层锁的状态,也就是state = count

  • countDown()就是解1层锁,也就是靠这个方法一点一点把state的值减到0
  • await()就是(加)获取共享锁,但是必须state为0才能(加)获取锁成功,否则按照AQS的机制,会进入等待队列阻塞,(加)获取锁成功后结束阻塞

简单使用

在使用的时候,我们需要先初始化一个 CountDownLatch 对象,指定一个计数器的初始值,表示需要等待的线程数量

然后在每个子线程执行完任务后,调用 countDown() 方法,计数器减 1

接着主线程调用 await() 方法进入阻塞状态,直到计数器为 0,也就是所有子线程都执行完任务后,主线程才会继续执行

class CountDownLatchExample {
  public static void main(String[] args) throws InterruptedException {
    int threadCount = 3;
    CountDownLatch latch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        new Thread(() -> {
            try {
                Thread.sleep((long) (Math.random() * 1000)); // 模拟任务执行
                System.out.println(Thread.currentThread().getName() + " 执行完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                latch.countDown(); // 线程完成后,计数器 -1
            }
        }).start();
    }

    latch.await(); // 主线程等待
    System.out.println("所有子线程执行完毕,主线程继续执行");
  }
}

CyclicBarrier

CyclicBarrier /ˈsaɪklɪk ˈbæriər/ 的字面意思是可循环使用的屏障,用于多个线程相互等待,直到所有线程都到达屏障后再同时执行

alt text
alt text

在使用的时候,我们需要先初始化一个 CyclicBarrier 对象,指定一个屏障值 N,表示需要等待的线程数量。

然后每个线程执行 await() 方法,表示自己已经到达屏障,等待其他线程,此时屏障值会减 1。

当所有线程都到达屏障后,也就是屏障值为 0 时,所有线程会继续执行。

与 CountDownLatch 的区别

CyclicBarrier 让所有线程相互等待,全部到达后再继续;CountDownLatch 让主线程等待所有子线程执行完再继续

对比项CyclicBarrierCountDownLatch
主要用途让所有线程相互等待,全部到达后再继续让主线程等待所有子线程执行完
可重用性✅ 可重复使用,每次屏障打开后自动重置❌ 不可重复使用,计数器归零后不能恢复
是否可执行回调✅ 可以,所有线程到达屏障后可执行 barrierAction❌ 不能
线程等待情况所有线程互相等待,一个线程未到达,其他线程都会阻塞主线程等待所有子线程完成,子线程执行完后可继续运行
适用场景线程相互依赖,需要同步执行主线程等待子线程完成
示例场景计算任务拆分,所有线程都到达后才能继续主线程等多个任务初始化完成

Semaphore

Semaphore /ˈsɛməˌfɔːr/ —— 信号量,用于控制同时访问某个资源的线程数量,类似限流器,确保最多只有指定数量的线程能够访问某个资源,超过的必须等待

在使用 Semaphore 时,首先需要初始化一个 Semaphore 对象,指定许可证数量,表示最多允许多少个线程同时访问资源。

然后在每个线程访问资源前,调用 acquire() 方法获取许可证,如果没有可用许可证,则阻塞等待。

需要注意的是,访问完资源后,要调用 release() 方法释放许可证

class SemaphoreExample {
  private static final int THREAD_COUNT = 5;
  private static final Semaphore semaphore = new Semaphore(2); // 最多允许 2 个线程访问

  public static void main(String[] args) {
      for (int i = 0; i < THREAD_COUNT; i++) {
          new Thread(() -> {
              try {
                  semaphore.acquire(); // 获取许可(如果没有可用许可,则阻塞)
                  System.out.println(Thread.currentThread().getName() + " 访问资源...");
                  Thread.sleep(2000); // 模拟任务执行
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } finally {
                  semaphore.release(); // 释放许可
              }
          }).start();
      }
  }
}

Semaphore 可以用于流量控制,比如数据库连接池、网络连接池等。

假如有这样一个需求,要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取。

但是在读到内存后,需要存储到数据库,而数据库连接数是有限的,比如说只有 10 个,那我们就必须控制线程的数量,保证同时只有 10 个线程在使用数据库连接。

用 Semaphore 控制并发数

// 信号量,只允许 10 个线程同时访问数据库
Semaphore semaphore = new Semaphore(10);

for (int i = 0; i < 30; i++) {
  new Thread(() -> {
    String data = readFile();  // 30 个线程可以同时读
    
    try {
      semaphore.acquire();   // 获取许可,最多 10 个能通过
      saveToDatabase(data);  // 同时只有 10 个线程在写数据库
    } finally {
      semaphore.release();   // 释放许可,其他等待的线程可以进来
    }
  }).start();
}

Exchanger

Exchanger——交换者,用于在两个线程之间进行数据交换

支持双向数据交换,比如说线程 A 调用 exchange(dataA),线程 B 调用 exchange(dataB),它们会在同步点交换数据,即 A 得到 B 的数据,B 得到 A 的数据。

如果一个线程先调用 exchange(),它会阻塞等待,直到另一个线程也调用 exchange()。

使用 Exchanger 的时候,需要先创建一个 Exchanger 对象,然后在两个线程中调用 e

上次编辑于: