NIO之多线程协作处理数据读写

源码学徒

共 8346字,需浏览 17分钟

 · 2021-07-04

有道无术,术尚可求也!有术无道,止于术!

经过前面几章的学习,我们已经 能够掌握了JDK NIO的开发方式,我们来总结一下NIO开发的流程:

  1. 创建一个服务端通道 ServerSocketChannel
  2. 创建一个选择器 Selector
  3. 将服务端通道注册到选择器上,并且关注我们感兴趣的事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  4. 绑定服务管道的地址 serverSocketChannel.bind(new InetSocketAddress(8989));
  5. 开始进行事件选择,选择我们感兴趣的事件做对应的操作!

具体的代码信息请参照第一章:多路复用模型章节,这里不做太多的赘述!

有关多路复用的概念,我们也在第一章进行了分析。多路复用模型能够最大限度的将一个线程的执行能力榨干,一条线程执行所有的数据,包括新连接的接入、数据的读取、计算与回写,但是假设,我们的数据计算及其缓慢,那么该任务的执行就势必影响下一个新链接的接入!

传统NIO单线程模型

单线程的NIO模型

如图,我们能了解到,单线程情况下,读事件因为要做一些业务性操作(数据库连接、图片、文件下载)等操作,导致线程阻塞再,读事件的处理上,此时单线程程序无法进行下一次新链接的处理!我们对该线程模型进行优化,select事件处理封装为任务,提交到线程池!

NIO多线程模型

image-20210416170643079

上面的这种数据结构能够解决掉因为计算任务耗时过长,导致新链接接入阻塞的问题,我们能否再次进行一次优化呢?

我们能否创建多个事件选择器,每个事件选择器,负责不同的Socket连接,就像下面这种:

NIO多线程优化模型

image-20210416123153046

这样我们就可以每一个Select选择器负责多个客户端Socket连接,主线程只需要将客户端新连接选择一个选择器注册到select选择器上就可以了!所以我们的架构图,就变成了下图这样:

image-20210416134148671

我们在select选择器内部处理计算任务的时候,也可以将任务封装为task,提交到线程池里面去,彻底将新连接接入和读写事件处理分离开,互不影响!事实上,这也是Netty的核心思想之一,我们可以根据上面的图例,自己简单写一个:

代码实现

构建一个事件执行器 对应上图的select选择器

/**
* Nio事件处理器
*
* @author huangfu
* @date
*/

public class MyNioEventLoop implements Runnable {
static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128);
private final Selector selector;
private final LinkedBlockingQueue<Runnable> linkedBlockingQueue;
public MyNioEventLoop(Selector selector) {
this.selector = selector;
linkedBlockingQueue = new LinkedBlockingQueue<>();
}

public Selector getSelector() {
return selector;
}

public LinkedBlockingQueue<Runnable> getLinkedBlockingQueue() {
return linkedBlockingQueue;
}

//忽略 hashCode和eques

/**
* 任务处理器
*/

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
//进行事件选择 这里我们只处理读事件
if (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//处理读事件
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
iterator.remove();
if (next.isReadable()) {
SocketChannel channel = (SocketChannel) next.channel();
int read = channel.read(ALLOCATE);
if(read > 0) {
System.out.printf("线程%s【%s】发来消-息:",Thread.currentThread().getName(), channel.getRemoteAddress());
System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8));
}else if(read == -1) {
System.out.println("连接断开");
channel.close();
}
ALLOCATE.clear();
}
}
selectionKeys.clear();
}else {
//处理异步任务 进行注册
while (!linkedBlockingQueue.isEmpty()) {
Runnable take = linkedBlockingQueue.take();
//异步事件执行
take.run();
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}

构建一个选择器组

/**
* 选择器组
*
* @author huangfu
* @date 2021年3月12日09:44:37
*/

public class SelectorGroup {
private final List<MyNioEventLoop> SELECTOR_GROUP = new ArrayList<>(8);
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private final static AtomicInteger IDX = new AtomicInteger();

/**
* 初始化选择器
* @param count 处理器数量
* @throws IOException 异常欣喜
*/

public SelectorGroup(int count) throws IOException {

for (int i = 0; i < count; i++) {
Selector open = Selector.open();
MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open);
SELECTOR_GROUP.add(myNioEventLoop);
}
}

public SelectorGroup() throws IOException {
this(AVAILABLE_PROCESSORS << 1);
}

/**
* 轮询获取一个选择器
* @return 返回一个选择器
*/

public MyNioEventLoop next(){
int andIncrement = IDX.getAndIncrement();
int length = SELECTOR_GROUP.size();

return SELECTOR_GROUP.get(Math.abs(andIncrement % length));
}
}

构建一个执行器记录器

/**
* @author huangfu
* @date
*/

public class ThreadContext {
/**
* 记录当前使用过的选择器
*/

public static final Set<MyNioEventLoop> RUN_SELECT = new HashSet<>();
}

构建一个新连接接入选择器

/**
* 连接器
*
* @author huangfu
* @date 2021年3月12日10:15:37
*/

public class Acceptor implements Runnable {
private final ServerSocketChannel serverSocketChannel;
private final SelectorGroup selectorGroup;

public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) {
this.serverSocketChannel = serverSocketChannel;
this.selectorGroup = selectorGroup;
}


@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
MyNioEventLoop next = selectorGroup.next();

//向队列追加一个注册任务
next.getLinkedBlockingQueue().offer(() -> {
try {
//客户端注册为非阻塞
socketChannel.configureBlocking(false);
//注册到选择器 关注一个读事件
socketChannel.register(next.getSelector(), SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
});
//唤醒对应的任务,让其处理异步任务
next.getSelector().wakeup();


System.out.println("检测到连接:" + socketChannel.getRemoteAddress());
//当当前选择器已经被使用过了 就不再使用了,直接注册就行了
if (ThreadContext.RUN_SELECT.add(next)) {
//启动任务
new Thread(next).start();
}


} catch (IOException e) {
e.printStackTrace();
}
}
}

创建启动器

/**
* 反应器
*
* @author huangfu
* @date 2021年3月12日10:15:14
*/

public class Reactor implements Runnable {
private final Selector selector;

public Reactor(Selector selector) {
this.selector = selector;
}

@Override
public void run() {
try {
System.out.println("服务启动成功");
while (!Thread.currentThread().isInterrupted()) {
//d等待连接事件
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
iterator.remove();
//进行数据分发
dispatch(next);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 将新连接分发到新连接接入器
* @param next 所有事件主键
*/

private void dispatch(SelectionKey next) {
Runnable attachment = (Runnable) next.attachment();
if(attachment!=null) {
attachment.run();
}
}
}

启动测试

/**
* @author huangfu
* @date
*/

public class TestMain {

public static void main(String[] args) throws IOException {
//创建一个选择器组 传递选择器组的大小 决定使用多少选择器来实现
SelectorGroup selectorGroup = new SelectorGroup(2);
//开启一个服务端管道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//开启一个服务端专用的选择器
Selector selector = Selector.open();
//设置非阻塞
serverSocketChannel.configureBlocking(false);
//创建一个连接器
Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup);
//将服务端通道注册到服务端选择器上 这里会绑定一个新连接接入器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor);
//绑定端口
serverSocketChannel.bind(new InetSocketAddress(8989));
//启动处理器
new Reactor(selector).run();
}
}

总结

  1. 单线程下的NIO存在性能瓶颈,当某一计算过程缓慢的时候会阻塞住整个线程,导致影响其他事件的处理!
  2. 为了解决这一缺陷,我们提出了使用异步线程的方式去操作任务,将耗时较长的业务,封装为一个异步任务,提交到线程池执行!
  3. 为了使业务操作和新连接接入完全分离开,我们做了另外一重优化,我们封装了一个选择器组,轮询的方式获取选择器,每一个选择器都能够处理多个新连接, socket连接->selector选择器 = 多 -> 1,在每一个选择器里面又可以使用线程池来处理任务,进一步提高吞吐量!

才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!



❤️「转发」「在看」,是对我最大的支持❤️



浏览 16
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报