> 文章列表 > [Netty] Selector选择器以及Reactor实现 (七)

[Netty] Selector选择器以及Reactor实现 (七)

[Netty] Selector选择器以及Reactor实现 (七)

文章目录

      • 1.Nio中的Selector介绍
        • 1.1 Selector
        • 1.2 SelectionKey
        • 1.3 ServerSocketChannel
        • 1.4 SocketChannel
      • 2.Netty中NioEventLoop的选择器
      • 3.Netty对Reactor的实现

1.Nio中的Selector介绍

通过Selector多路复用器实现IO的多路复用, Selector可以监听多个连接的Channel事件, 同事可以不断的查询已注册Channel是否处于就绪状态, 实现一个线程高效管理多个Channel

  1. Selector能够同时检测对个注册的通道上是否有事件发生
    多个Channel以事件的方式注册到同一个Selector。
  2. 如果有事件发生,就获取事件,然后对事件进行处理,只使用一个线程就管理了多个通道。
  3. 只有在一个通道真正有读写事件时,才会执行。

[Netty] Selector选择器以及Reactor实现 (七)

Selector、SelectionKey、ServerSocketChannel和SocketChannel的关系

  1. 客户端连接时候, 会通过ServerSocketChannel得到SocketChannel
  2. SocketChannel注册入Selector, 通过SelectableChannel注册
  3. 注册后返回SelectionKey
  4. Selector使用select方法进行监听
  5. 返回有事件发生的通道的SelectionKey进而获取到Channel

这里的Selector, SelectionKey, ServerSocketChannel, SocketChannel都是Nio中的。

NIO中的ServerSocketChannel功能类似java jdk中的ServerSocket,SocketChannel功能类似Socket

1.1 Selector

[Netty] Selector选择器以及Reactor实现 (七)

  • slector.select();//阻塞
  • slector.select(1000);//阻塞1000毫秒,在1000毫秒后返回
  • slector.WAKEUP();//唤醒slector
  • slector.selectNow();//不阻塞,立马返回

1.2 SelectionKey

标识Selector和网络通道的注册关系

public abstract class SelectionKey {public abstract Selector selector();//得到与之关联的Selector对象public abstract SelectableChannel channel();//得到与之关联的通道public final Object attachment();//得到与之关联的共享数据public abstract SelectionKey interestOps(int ops);//设置或改变监听事件public final boolean isAcceptable();//是否可以acceptpublic final boolean isReadable();//是否可以读public final boolean isWritable();//是否可以写
}

[Netty] Selector选择器以及Reactor实现 (七)

  • int OP_ACCEPT:有新的网络连接可以accept,值为16
  • int OP_CONNECT:代表连接已经建立,值为8
  • int OP_READ:代表读操作,值为1
  • int OP_WRITE:代表写操作,值为4

1.3 ServerSocketChannel

ServerSocketChannel在服务器端监听薪的客户端Socket连接

public abstract class ServerSocketChannelextends AbstractSelectableChannelimplements NetworkChannel
{public static ServerSocketChannel open();//得到一个ServerSocketChannel通道public final ServerSocketChannel bind(SocketAddress local);//设置服务器端端口号public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值false标识采用非阻塞模式public abstract SocketChannel accept();//接受一个连接,返回代表这个连接的通道对象public final SelectionKey register(Selector sel, int ops);//注册一个选择器并设置监听事件
}

[Netty] Selector选择器以及Reactor实现 (七)

1.4 SocketChannel

网络IO通道, 具体负责进行读写操作, Nio把缓冲区的数据写入到通道, 或者把通道的数据写入缓冲区

public abstract class SocketChannelextends AbstractSelectableChannelimplements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
{public static SocketChannel open();//得到一个SocketChannel通道public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值false标识采用非阻塞模式public abstract boolean connect(SocketAddress remote);//连接服务器public abstract boolean finishConnect();//如果上面的方法连接失败,接下来就要通过该方法完成连接操作public abstract int write(ByteBuffer src);//往通道里写数据public abstract int read(ByteBuffer dst);//从通道里读数据public abstract SelectionKey register(Selector sel, int ops, Object att);//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据public final void close();//关闭通道
}

[Netty] Selector选择器以及Reactor实现 (七)

2.Netty中NioEventLoop的选择器

Netty中的选择器是通过NioEventLoop.run()方法中的select方法去调用Nio包下的selector选择器实现的

[Netty] Selector选择器以及Reactor实现 (七)

NioEventLoop.run()

run 方法的主基调肯定是死循环等待 I/O 事件产生, 然后处理事件

    @Overrideprotected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}

[Netty] Selector选择器以及Reactor实现 (七)

  • select 等待 I/O 事件的发生
  • 处理发生的 I/O 事件
  • 处理提交至线程中的任务, 包括提交的异步任务、定时任务、尾部任务

[Netty] Selector选择器以及Reactor实现 (七)

这个策略会根据最近将要发生的定时任务的执行时间来控制 select 最长阻塞的时间。

[Netty] Selector选择器以及Reactor实现 (七)

select方法中的是Nio包下的selector

3.Netty对Reactor的实现

  • Reactor: I/O多路复用, 当多条连接共用一个阻塞对象后,进程只需要在一个阻塞对象上等待,而无需再轮训所有连接,常见的实现方式有select,epoll,kqueue等。
  • Reactor: 是一种开发模式, 为反应堆, 代表事件反应, 有单线程, 多线程, 主从多线程

Reactor单线程模式

在这里插入图片描述

EventLoopGroup eventGroup = new NioEventLoopGroup(1)
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventGroup);

非主从Reactor多线程模式

在这里插入图片描述

EventLoopGroup eventGroup = new NioEventLoopGroup()
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventGroup);

主从Reactor多线程模式

在这里插入图片描述

EventLoopGroup bossGroup = new NioEventLoopGroup()
EventLoopGroup workerGroup = new NioEventLoopGroup()
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);

老人咖美文