Java多线程之CyclicBarrier

ProjectDaedalus

共 3820字,需浏览 8分钟

 · 2021-11-30

这里就JUC包中的CyclicBarrier类做相关介绍

abstract.jpeg

概述

JUC中的CyclicBarrier类是一个并发控制工具。其可以使线程在栅栏处进行等待。当指定数量的线程全部到达栅栏处后栅栏才会打开,从而使各线程结束阻塞继续向下执行。其主要方法如下所示,可以看到在线程全部到达栅栏时,还可以通过barrierAction参数设置准备打开栅栏前需执行的任务。其中,该任务由最后一个到达栅栏的线程负责执行。具体地,线程调用await方法实现告诉CyclicBarrier自己已经到达栅栏处,并阻塞等待栅栏打开

// 创建一个指定计数器值的CyclicBarrier实例
public CyclicBarrier(int parties);

// 创建一个指定计数器值的CyclicBarrier实例, 并指定栅栏打开前需执行的任务
public CyclicBarrier(int parties, Runnable barrierAction);

// 线程阻塞等待栅栏打开
public int await() throws InterruptedException, BrokenBarrierException;

// 支持超时的await方法
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;

// 唤醒其他正在栅栏处被阻塞的线程(即抛出BrokenBarrierException异常), 同时将CyclicBarrier实例恢复为初始化状态,以便下一次使用
public void reset();

基本实践

下面即是一个CyclicBarrier的基本实践示例


public class CyclicBarrierTest1 {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");

    @Test
    public void test1() throws InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        Runnable initTask = () -> {
            info("---------------------------------");
        };

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, initTask);

        Stream.of("张三","李四","王二")
            .map( name -> new PlayGame(name, cyclicBarrier) )
            .forEach(playGame -> {
                threadPool.execute(playGame);
            } );

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
        info("Game Over");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String threadName = Thread.currentThread().getName();
        String log = "["+time+"] "+ msg+" <"+threadName+">";
        System.out.println(log);
    }

    /**
     * 模拟业务耗时
     */

    private static void doSomeWork() {
        try{
            Integer second = RandomUtils.nextInt(3,20);
            System.out.println("second: " + second);
            Thread.sleep( second * 1000 );
        }catch (Exception e) {
            System.out.println( "Happen Exception: " + e.getMessage());
        }
    }

    @AllArgsConstructor
    private static class PlayGame implements Runnable{

        private String name;

        private CyclicBarrier cyclicBarrier;

        @Override
        public void run() {
            // 模拟业务耗时
            doSomeWork();
            info(name + " 上线");
            // 阻塞等待其他玩家上线
            try{
                cyclicBarrier.await();
            }catch (Exception e) {
                System.out.println( "Happen Exception: " + e);
            }

            info(name + " 选择角色 开始");
            // 模拟业务耗时
            doSomeWork();
            info(name + " 选择角色 结束");
            // 阻塞等待其他玩家选择角色
            try{
                cyclicBarrier.await();
            }catch (Exception e) {
                System.out.println( "Happen Exception: " + e);
            }

            info(name + " 开始游戏");

        }
    }
}

从测试结果可以看出,当用户 开始选择角色 或 开始游戏时,各线程是同时开始的。至此也可以看出其与CountDownLatch的显著区别,后者是一次性的,而前者CyclicBarrier则可以重复使用

figure 1.jpeg

基本原理

通过上面的代码示例,可以看到CyclicBarrier与CountDownLatch相比功能很类似。只不过前者可以重复使用,而后者则是一次性的。但二者在实现上却大相径庭,CountDownLatch是直接基于AQS实现的。而CyclicBarrier则是利用ReentrantLock、Condition进行实现的。具体地,当线程调用CyclicBarrier的await方法时,如果未达到指定数量时,则是通过Condition条件变量的await方法进行阻塞的;如果是最后一个线程则会通过Condition条件变量的signalAll方法来唤醒所有被阻塞的线程

与此同时,由于CyclicBarrier是可重复使用的。故每一轮结束后,其内部会通过nextGeneration方法生成所谓的下一代CyclicBarrier。本质上相当于重新实例化了一次CyclicBarrier

Note

  1. 在实际使用CyclicBarrier过程中,需要非常小心处理BrokenBarrierException异常。本文示例代码为了简便,故省略了异常处理过程。因为发生该异常说明栅栏被损坏了。推荐的处理措施有:一方面,调用CyclicBarrier的reset方法,来唤醒其他由于调用await方法而被阻塞的线程以避免一直被阻塞,同时将CyclicBarrier实例恢复至初始化状态;另一方面,推荐使用具有超时机制的await方法,以避免线程被永久性阻塞

参考文献

  1. Java并发编程之美 翟陆续、薛宾田著
浏览 18
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报