> 文章列表 > CountDownLatch:Java中的同步工具

CountDownLatch:Java中的同步工具

CountDownLatch:Java中的同步工具

CountDownLatch:Java中的同步工具

    • 理解CountDownLatch
    • 使用CountDownLatch
    • 实践中的CountDownLatch
    • 总结

在多线程编程中,有时需要等待一个或多个线程完成它们的任务,然后再继续执行下一步操作。这种场景下,我们可以使用CountDownLatch来实现等待-通知机制。

理解CountDownLatch

CountDownLatch是Java中的一个同步工具,它允许一个或多个线程等待其他线程完成它们的操作后再继续执行。CountDownLatch包含一个计数器,该计数器初始化为一个正整数N。当一个线程完成一个操作时,计数器的值会减1。当计数器的值变为0时,所有等待中的线程将被释放。

CountDownLatch通常用于实现等待-通知机制,其中一个或多个线程等待其他线程完成它们的操作,然后再继续执行。例如,在一个多线程程序中,主线程可以使用CountDownLatch来等待所有工作线程完成它们的任务,然后再继续执行下一步操作。

使用CountDownLatch

下面是一个简单的示例,演示如何使用CountDownLatch:

import java.util.concurrent.CountDownLatch;public class CountDownLatchExample {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);Thread t1 = new Thread(() -> {System.out.println("Thread 1 is running");latch.countDown();});Thread t2 = new Thread(() -> {System.out.println("Thread 2 is running");latch.countDown();});Thread t3 = new Thread(() -> {System.out.println("Thread 3 is running");latch.countDown();});t1.start();t2.start();t3.start();latch.await();System.out.println("All threads have completed their tasks");}
}

在这个例子中,我们创建了一个CountDownLatch对象,计数器的初始值为3。然后,我们创建了三个线程t1、t2和t3,它们各自完成自己的任务,并调用了CountDownLatch的countDown()方法来减少计数器的值。最后,我们调用CountDownLatch的await()方法来等待所有线程完成它们的任务。当计数器的值变为0时,await()方法将返回,主线程将继续执行下一步操作。

实践中的CountDownLatch

最近需要删除公司的S3上的大量文件以及对应的MySQL中存储的索引。

由于要删除的量级比较大,且公司的S3没有开放批量删除的接口,因此一开始引入了多线程:

public void physicallyDelete(List<String> idList) {int pageId = 1;boolean isHasNextPage;do {PageHelper.startPage(pageId, Constants.DEFAULT_PAGE_SIZE);List<Info> infoList = infoDAO.getByIdList(idList);PageInfo<Info> page = new PageInfo<>(infoList);isHasNextPage = page.isHasNextPage();Stopwatch stopwatch = Stopwatch.createStarted();infoList.forEach(info ->threadPool.execute(() -> {deleteSmartBucketAndMySQL(info);}));LOGGER.info("s3和数据库删除成功, size:{}, cost:{}", infoList.size(),stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));} while (isHasNextPage);
}private void deleteS3AndMySQL(Info info) {String key = getKeyFromS3Url(info.getVideoUrl());try {Stopwatch stopwatch = Stopwatch.createStarted();S3Manager.deleteFile(bucketName, key);LOGGER.info("s3删除成功, bucket:{}, key:{}, cost:{}", bucketName, key,stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));} catch (Exception e) {LOGGER.error("s3删除失败, bucket:{}, key:{}", bucketName, key, e);return;}	infoDAO.physicallyDeleteByIdList(Collections.singletonList(info.getId()));
}

但是发现了一个问题,从理论上来讲,删除一个S3上的文件,应该对应删除一条MySQL上的记录;但是在日志中发现,每删除一条MySQL上的记录,就多次重复触发了删除S3上的对应文件。

排查发现,由于在physicallyDelete方法中存在分页查询,有可能在deleteS3AndMySQL方法中已经删除了S3,但尚未删除MySQL中的记录时,已经进行了分页查询下一页,线程池中的其他线程又运行了deleteS3AndMySQL方法,导致重复调用了S3的删除接口。

为了解决这个问题,我们可以引入CountDownLatch,代码如下:

public void physicallyDelete(List<String> idList) {int pageId = 1;boolean isHasNextPage;do {PageHelper.startPage(pageId, Constants.DEFAULT_PAGE_SIZE);List<Info> infoList = infoDAO.getByIdList(idList);PageInfo<Info> page = new PageInfo<>(infoList);isHasNextPage = page.isHasNextPage();CountDownLatch latch = new CountDownLatch(infoList.size());Stopwatch stopwatch = Stopwatch.createStarted();infoList.forEach(info ->threadPool.execute(() -> {try {deleteS3AndMySQL(info);} catch (Exception e) {LOGGER.info("deleteS3AndMySQL error", e);} finally {latch.countDown();}}));LOGGER.info("s3和数据库删除成功, size:{}, cost:{}", infoList.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));try {latch.await();} catch (Exception e) {LOGGER.info("latch.await异常", e);Thread.currentThread().interrupt();}} while (isHasNextPage);
}

这样,由于CountDownLatch的存在,就会等到线程池中的线程将分页查出的全部数据处理完毕后,再去查出下一页数据进行处理,从而避免多次重复调用S3删除接口。

总结

CountDownLatch是Java中的一个同步工具,它允许一个或多个线程等待其他线程完成它们的操作后再继续执行。CountDownLatch通常用于实现等待-通知机制,其中一个或多个线程等待其他线程完成它们的操作,然后再继续执行。在多线程编程中,CountDownLatch是一种非常有用的工具,可以帮助我们实现复杂的同步逻辑。