Disruptor高性能之道-环形数组RingBuffer

分布式朝闻道

共 2504字,需浏览 6分钟

 · 2022-03-02

Disruptor,一款超高性能、超低延迟的并发编程框架。这里用了两个“超”来突出它在性能上的优越性。

它的性能远远超过了传统并发编程基于锁同步,阻塞队列的实现方案,在高性能后端编程中,disruptor是一个不错的选择。

Disruptor从何而来?

Disruptor的爆火起源于软件开发大师 martin fowler(马丁富勒)在自己网站上一篇文章,原文链接 文章介绍了外汇交易平台LMAX使用并开源的一种架构方案。

LMAX使用该方案实现了难以置信的 "单线程每秒处理600w订单" 的惊人能力。业务处理逻辑基于 完全运行内存运行 + 事件溯源 方式驱动。

Disruptor目前已经被LMAX开源,github地址 https://github.com/LMAX-Exchange/disruptor。

Disruptor当前最新稳定版的maven坐标为:



    com.lmax
    disruptor
    3.4.4

感兴趣的读者可以自行添加。

Disruptor有何特点?

Disruptor性能优越,必然有其设计上的独到之处,一般来说,我们认为Disruptor有以下特点:

  • Disruptor是面向并发编程的高性能框架,它在开发上简化了并发程序编码难度,性能上也是JUC并发包的数倍乃至十几倍;
  • Disruptor是CPU友好的、无锁的,基于单线程方式对任务进行调度,减少了上下文切换对系统资源的开销;
  • Disruptor底层数据结构基于数组,通过预加载方式提前加载对象到内存;Disruptor不会清理缓存中的数据,而是通过覆盖对象属性方式实现数据的读写,这降低了GC频率,使得系统资源的使用趋于平稳;
  • Disruptor能够避免“伪共享”,通过缓存行填充机制,Disruptor避免了伪共享对并发读写变量的消耗,消除了不必要缓存未命中。

Disruptor的环形数组RingBuffer

Ringbuffer(环形缓冲区/环形数组)是Disruptor的核心底层数据结构。

它不同于传统的阻塞队列(如:ArrayBlockingQueue)是从某一端入队,另外一端出队,而是一种首尾相连的环形结构。

ringbuffer.png

之所以叫它 buffer,我想大概是因为这个环形队列是作为不同线程(or上下文)之间传递数据媒介,类似于一个缓冲区。

RingBuffer拥有一个序号,指向数组中下一个可用的元素,需要注意的是Disruptor中的RingBuffer没有头尾指针,而只通过序号(sequence)就实现了生产者与消费者之间的进度协调。

RingBuffer可以一直填充吗?

假如不断地填充RingBuffer,那么必然会发生sequence一直增加,直到绕过环,覆盖原有的内容。

Disruptor是通过barrier实现了是否要覆盖原有内容的判断,这部分内容后面会说到。

如何定位RingBuffer中的元素呢?

正如我们在前面所说,RingBuffer本质上是个数组,那么必然可以通过数组的偏移量offset或者说index,定位到具体的元素。

在实际的开发中,我们常通过取模运算来获取元素在数组中的偏移量。也就是  「序号 % 长度 == 索引」

假设有8个元素,那么元素序号为13的元素就位于:

13 % 8 = 5

对于Disruptor而言,它强制要求数组的size初始化为 2的N次方,如 1024 * 1024。

设置为2的N次方有这样的好处:可以通过位运算更快速定位到元素位置。公式为:

seq & (ringBufferSize - 1) == index

在Disruptor中, ringBufferSize-1 称为mask,即掩码。

RingBuffer中的数据是如何预热的?

RingBuffer通过预分配对象机制来降低GC的影响。在实际运行过程中,业务从RingBuffer中获取对应sequence位置的对象引用,对该引用指向的对象属性赋值,通过覆盖写方式而不是直接覆盖整个对象的方式,保证了对象引用在整个disruptor存活的周期内都存在,保证GCRoot始终存在,因此能够大幅降低GC的影响。

这也是Disruptor高性能保证的策略之一,由于Disruptor主要使用场景之一就是低延迟环境,因此必须减少运行时内存分配,从而减少垃圾回收导致的系统停顿(STW)。

这种预加载机制在其他的中间件也有使用,如RocketMQ的commitLog也是在broker启动时就创建固定1G的文件,便于启动完成便可进行写入而不需要进行运行期创建。

Disruptor的RingBuffer数据预热具体的实现,查看Disruptor源码:

Disruptor初始化过程中会初始化RingBuffer:

    RingBuffer( EventFactory eventFactory,Sequencer sequencer){
        super(eventFactory, sequencer);
    }

RingBuffer是RingBufferFields子类:

public final class RingBuffer extends RingBufferFields implements Cursored, EventSequencer, EventSink

初始化RingBuffer时会先调用父类构造:


    RingBufferFields(EventFactory eventFactory, Sequencer sequencer) {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        // 用于计算index的掩码,公式:seq & (ringBufferSize - 1) == index

        this.indexMask = bufferSize - 1;

        // 初始化RingBuffer数组

        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];

        // 预填充RingBuffer数组
        fill(eventFactory);
    }

接着调用fill方法预填充数组,实现逻辑就是为数组的每个index填充一个对象实例。

    private void fill(EventFactory eventFactory){
        for (int i = 0; i < bufferSize; i++){
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

填充操作通过用户定义的eventFactory实现,该工厂一般写法为:

public class OrderEventFactory implements EventFactory {
    @Override
    public OrderEvent newInstance() {
        // new 一个空的orderEvent对象即可
        // 就是为了返回空的event对象
        return new OrderEvent();
    }
}

小结

本文主要介绍了Disruptor的背景,并重点介绍了Disruptor高性能实现机制之一的:环形数组RingBuffer以及预填充数组。

这些机制值得开发者不断学习思考并运用到日常工作中去。


浏览 109
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报