案例驱动学disruptor框架

分布式朝闻道

共 9946字,需浏览 20分钟

 · 2021-06-12

disruptor框架:通过填充缓冲行,消除了CPU的伪共享。它的模型 类似于生产者-消费者模型,我们可以类比JUC中的ArrayBlockingQueue。

对于disruptor而言,业务逻辑是完全运行在内存中的,它使用事件源驱动的方式,为事件预分配内存,避免了运行时因垃圾回收以及内存分配产生增加额外的开销,从而影响性能。

使用disruptor框架有以下几个步骤:

  1. 首先需要定义一个event类,用于创建Event类实例对象
  2. 还需要有个监听事件类,用于处理数据event类,可以理解成是事件的消费者
  3. 编写生产者组件向disruptor容器中去投递数据(这里的数据就是事件)
  4. 最终需要实例化一个disruptor实例,配置参数,编写disruptor核心组件,运行代码。

我们根据这些步骤,结合代码详细看一下如何使用disruptor框架。

定义事件

首先需要定义事件,这里简单的给一个OrderEvent

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {

    private long value;
}

二、提供一个事件工厂。disruptor需要调用该事件工厂,实例化出对应的空对象(无属性)

public class OrderEventFactory implements EventFactory<OrderEvent> {


    @Override
    public OrderEvent newInstance() {
        // new 一个空的orderEvent对象即可
        // 就是为了返回空的event对象
        return new OrderEvent();
    }
}

可以看出,自定义的事件工厂需要实现EventFactory接口,这样disruptor框架就可以回调该接口以实例化对应的事件对象

三、编写EventHandler实现类,它本质上可以理解为是对应业务的消费者,即:对事件的具体执行逻辑

public class OrderEventHandler implements EventHandler<OrderEvent> {

    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("消费者: receive->" + event);
    }
}

在onEvent回调方法中,实现具体的消费逻辑,这里的Event对象已经是具体的业务对象了,是赋值过属性的,那么属性是如何被赋值的呢?

四、既然已经有了消费者,那么肯定会有对应的生产者

编写一个生产者类,持有一个RingBuffer引用,RingBuffer是disruptor中的一个核心数据结构,它事实上保存了实践对象,可以把它看做线程池中的阻塞队列(先简单这么理解,后面会深入解释)

public class OrderEventProducer {

    private RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(ByteBuffer data) {

        System.out.println("生产者: sendData->" + data.getLong(0));

        // 1. 在生产者发送消息的时候 首先需要从我们的RingBuffer中获取一个可用的序号
        long sequence = this.ringBuffer.next();
        try {
            // 2. 根据这个序号找到具体的元素  此时获取到的event是一个空对象(属性未赋值)
            OrderEvent orderEvent = this.ringBuffer.get(sequence);
            // 3. 进行实际赋值处理即可
            orderEvent.setValue(data.getLong(0));
        } finally {
            // 4. 提交发布操作
            this.ringBuffer.publish(sequence);
        }
    }
}
  1. 通过构造方法(实际上别的方式也可以,比如setter传参或者SpringBean的注入等方式)将RingBuffer引入传递给成员变量
  2. sendData方法接收一个ByteBuffer对象,实际上可以是任意的对象
  3. 调用 this.ringBuffer.next(); 从RingBuffer中取出一个可用的序号
  4. 根据取出的序号找到一个具体的元素,注意这时候从RingBuffer中取出的是一个空对象,(并不是NULL)是通过事件工厂new出来的一个普通POJO
  5. 此时就可以为事件进行属性赋值
  6. 最终通过RingBuffer的publish方法提交序号,最终实际上影响的是对应序号上的事件对象实例的引用

五、编写测试用例,运行并观察结果

public class DisruptorDemo {

    public static void main(String[] args) {
        // 1. 参数准备工作
        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 1024 * 1024;
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                Runtime.getRuntime().availableProcessors(),
                Runtime.getRuntime().availableProcessors() * 2,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable runnable) {
                        Thread thread = new Thread(runnable);
                        thread.setName("threadpool-");
                        return thread;
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        /**
         * 2. 实例化disruptor对象
         * eventFactory:   消息event工厂对象
         * ringBufferSize: 容器长度
         * executor:       线程池 建议使用自定有线程池
         * ProduceType:    生产者是单还是多
         * waitStrategy:   等待策略
         */
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                orderEventFactory,
                ringBufferSize,
                threadPoolExecutor,
                ProducerType.SINGLE,
                new BlockingWaitStrategy()
        );

        // 3. 添加消费者监听  构建disruptor与消费者的关联关系
        disruptor.handleEventsWith(new OrderEventHandler());

        // 4. 启动disruptor
        disruptor.start();

        // 5. 获取实际存储数据的容器:RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();

        OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(32);
        for (long i = 0; i < 100; i++) {
            byteBuffer.putLong(0, i);
            orderEventProducer.sendData(byteBuffer);
        }

        disruptor.shutdown();
        threadPoolExecutor.shutdown();
    }
}
  1. 首先进行了一些简单的参数准备工作,实例化了事件工厂、初始化了ringBufferSize、自定义了一个线程池
  2. 实例化disruptor对象,暂时指定生产者类型为单生产者,后续再介绍多生产者如何使用
  3. 为disruptor添加消费者监听,实际上可以添加多个,构建出disruptor与消费者的关联关系
  4. 启动disruptor
  5. 获取ringBuffer实例,将引用传递给生产者
  6. 通过生产者为ringBuffer中的事件进行赋值,并发布 消费者消费事件对象
  7. 关闭disruptor

运行案例,日志打印如下

省略部分内容.....
生产者: sendData->96
消费者: receive->OrderEvent(value=95)
生产者: sendData->97
消费者: receive->OrderEvent(value=96)
生产者: sendData->98
消费者: receive->OrderEvent(value=97)
生产者: sendData->99
消费者: receive->OrderEvent(value=98)

可以看到,生产顺序与消费顺序都是有序的,并且实现了生产者逻辑与消费者逻辑的解耦。

本文主要从案例出发,对disruptor先入为主的进行了体验,后续我们会深入各个组件详细认识这个高性能线程间消息传递库。


浏览 51
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报