案例驱动学disruptor框架
分布式朝闻道
共 9946字,需浏览 20分钟
· 2021-06-12
❝disruptor框架:通过填充缓冲行,消除了CPU的伪共享。它的模型 类似于生产者-消费者模型,我们可以类比JUC中的ArrayBlockingQueue。
❞
对于disruptor而言,业务逻辑是完全运行在内存中的,它使用事件源驱动的方式,为事件预分配内存,避免了运行时因垃圾回收以及内存分配产生增加额外的开销,从而影响性能。
使用disruptor框架有以下几个步骤:
首先需要定义一个event类,用于创建Event类实例对象 还需要有个监听事件类,用于处理数据event类,可以理解成是事件的消费者 编写生产者组件向disruptor容器中去投递数据(这里的数据就是事件) 最终需要实例化一个disruptor实例,配置参数,编写disruptor核心组件,运行代码。
我们根据这些步骤,结合代码详细看一下如何使用disruptor框架。
定义事件
首先需要定义事件,这里简单的给一个OrderEvent
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
private long value;
}
二、提供一个事件工厂。disruptor需要调用该事件工厂,实例化出对应的空对象(无属性)
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
// new 一个空的orderEvent对象即可
// 就是为了返回空的event对象
return new OrderEvent();
}
}
可以看出,自定义的事件工厂需要实现EventFactory接口,这样disruptor框架就可以回调该接口以实例化对应的事件对象
三、编写EventHandler实现类,它本质上可以理解为是对应业务的消费者,即:对事件的具体执行逻辑
public class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消费者: receive->" + event);
}
}
❝在onEvent回调方法中,实现具体的消费逻辑,这里的Event对象已经是具体的业务对象了,是赋值过属性的,那么属性是如何被赋值的呢?
❞
四、既然已经有了消费者,那么肯定会有对应的生产者
❝编写一个生产者类,持有一个RingBuffer引用,RingBuffer是disruptor中的一个核心数据结构,它事实上保存了实践对象,可以把它看做线程池中的阻塞队列(先简单这么理解,后面会深入解释)
❞
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
System.out.println("生产者: sendData->" + data.getLong(0));
// 1. 在生产者发送消息的时候 首先需要从我们的RingBuffer中获取一个可用的序号
long sequence = this.ringBuffer.next();
try {
// 2. 根据这个序号找到具体的元素 此时获取到的event是一个空对象(属性未赋值)
OrderEvent orderEvent = this.ringBuffer.get(sequence);
// 3. 进行实际赋值处理即可
orderEvent.setValue(data.getLong(0));
} finally {
// 4. 提交发布操作
this.ringBuffer.publish(sequence);
}
}
}
通过构造方法(实际上别的方式也可以,比如setter传参或者SpringBean的注入等方式)将RingBuffer引入传递给成员变量 sendData方法接收一个ByteBuffer对象,实际上可以是任意的对象 调用 this.ringBuffer.next(); 从RingBuffer中取出一个可用的序号 根据取出的序号找到一个具体的元素,注意这时候从RingBuffer中取出的是一个空对象,(并不是NULL)是通过事件工厂new出来的一个普通POJO 此时就可以为事件进行属性赋值 最终通过RingBuffer的publish方法提交序号,最终实际上影响的是对应序号上的事件对象实例的引用
五、编写测试用例,运行并观察结果
public class DisruptorDemo {
public static void main(String[] args) {
// 1. 参数准备工作
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 1024 * 1024;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("threadpool-");
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
/**
* 2. 实例化disruptor对象
* eventFactory: 消息event工厂对象
* ringBufferSize: 容器长度
* executor: 线程池 建议使用自定有线程池
* ProduceType: 生产者是单还是多
* waitStrategy: 等待策略
*/
Disruptor<OrderEvent> disruptor = new Disruptor<>(
orderEventFactory,
ringBufferSize,
threadPoolExecutor,
ProducerType.SINGLE,
new BlockingWaitStrategy()
);
// 3. 添加消费者监听 构建disruptor与消费者的关联关系
disruptor.handleEventsWith(new OrderEventHandler());
// 4. 启动disruptor
disruptor.start();
// 5. 获取实际存储数据的容器:RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
for (long i = 0; i < 100; i++) {
byteBuffer.putLong(0, i);
orderEventProducer.sendData(byteBuffer);
}
disruptor.shutdown();
threadPoolExecutor.shutdown();
}
}
首先进行了一些简单的参数准备工作,实例化了事件工厂、初始化了ringBufferSize、自定义了一个线程池 实例化disruptor对象,暂时指定生产者类型为单生产者,后续再介绍多生产者如何使用 为disruptor添加消费者监听,实际上可以添加多个,构建出disruptor与消费者的关联关系 启动disruptor 获取ringBuffer实例,将引用传递给生产者 通过生产者为ringBuffer中的事件进行赋值,并发布 消费者消费事件对象 关闭disruptor
运行案例,日志打印如下
省略部分内容.....
生产者: sendData->96
消费者: receive->OrderEvent(value=95)
生产者: sendData->97
消费者: receive->OrderEvent(value=96)
生产者: sendData->98
消费者: receive->OrderEvent(value=97)
生产者: sendData->99
消费者: receive->OrderEvent(value=98)
可以看到,生产顺序与消费顺序都是有序的,并且实现了生产者逻辑与消费者逻辑的解耦。
本文主要从案例出发,对disruptor先入为主的进行了体验,后续我们会深入各个组件详细认识这个高性能线程间消息传递库。
评论
盘点一个使用超级鹰识别验证码并自动登录的案例
点击上方“Python共享之家”,进行关注回复“资源”即可获赠Python学习资料今日鸡汤江上几人在,天涯孤棹还。大家好,我是皮皮。一、前言前几天在Python钻石交流群【静惜】问了一个Python实现识别验证码并自动登录的问题,提问截图如下:验证码的截图如下所示:二、实现过程这里大家激烈的探讨,【
IT共享之家
0
高并发实战案例 100 讲,已更新24节,即将涨价,抓紧了
大家好,我是路人。本人亲自录制的《Java 高并发 & 微服务 & 性能调优实战案例 100 讲》已发布了 24 个课时,干货满满。59元,一杯咖啡的价格,100个实战案例,即将涨价到 99,需要的小伙伴速度啦,文末附下单方式。已发布 24 个课时1、SpringBoot实
路人甲Java
0
前端框架新势力大盘点
点击上方 前端Q,关注公众号回复加群,加入前端Q技术交流群近年来,前端领域快速发展,新的框架不断涌现,为开发者提供了更多选择和解决方案。尽管 React、Vue、Angular、Next.js、Preact 等老牌框架依然稳坐市场主流,但新势力前端框架的崛起也为特定场景带来了更佳的适配和优
前端Q
0
常见 Cilium 使用案例
常见20个 Cilium 使用案例(2)本文翻译来源于 Isovalent 网站的Top 20 Cilium Use Cases(https://isovalent.com/blog/post/top-20-cilium-use-cases/)。由于平台限制,原文中的一些超链接被移除。Cilium是
k8s技术圈
2
110 个 Java 主流组件和框架整理,常用的应有尽有,建议收藏!!
点击关注公众号,Java 干货及时推送↓推荐阅读:铜三铁四,怒拿 35K * 14 薪!整理:四猿外以下排序是按照从技术组件到开发框架到代码工具,也有一些实在不好分类的,就放到最后了。WEB 容器Tomcathttps://tomcat.apache.org/Jettyhttps://ww
Java技术栈
0
网络工程案例:某学校机房项目交换机的配置
一、学校项目配置案例某校计算机系承办市中考电脑阅卷任务,市教育局要求学校提供四百台电脑供改卷教师使用,同时需要4台配置性能较高的服务器以供四百台客户端电脑访问。该校计算机系四百台电脑分布在7间机房中,共由4个IP网段组成。一、要求:为了安全起见,要求处在4个网段的电脑相互之间不能访问,但所有的电脑均
数据中心运维管理
10
判了!涉电动自行车6种典型案例
当前,电动自行车已成为人们出行代步的重要工具之一,但因违规停放、违规充电或改装、更换锂电池引发的事故触目惊心。近期,北京通报多起判罚案例,包含以下6种典型情形,让我们以案说法,通过案例了解一下车主、物业、卖家、房东等不同群体需要承担的责任。情形1将电动自行车停在楼梯间罚款850近日,北京经开区消防救
盱眙老妹
0
大模型“芝麻开门”的首选框架,LangChain全新教程!附600分钟详解视频
LangChain 作为一个开源的大语言模型应用框架,自诞生之日起就备受瞩目。然而,它的发展之路却走过了不少曲折。一开始,LangChain 遭受了不少质疑和非议。有人认为它只适合入门学习,代码质量和设计缺乏工业级的严谨性,难以应用于生产环境。的确,作为一个新兴项目,LangChain 的早期版本还
机器学习算法与Python实战
10