> 文章列表 > Chapter13-基于 Netty 的通信实现

Chapter13-基于 Netty 的通信实现

Chapter13-基于 Netty 的通信实现

13.l Netty 介绍

         Netty 是一个网络应用框架,或者说是一个 Java 网络开发库。 Netty 提供异步事件驱动的方式,使用它可以快速地开发出高性能的网络应用程序,比如客户端/服务器自定义协议程序,大大简化了网络程序的开发过程。

        Netty 是一个精心设计的框架,它从许多协议实现中吸收了丰富的经验,比如 FTP , SMTP 、 HTTP 等许多基于二进制和文本的传统协议。 借助 Netty ,可以比较容易地开发 出达到 Java 网络专家+并发编程专家水平的通信程序 。

        了解 Netty 前需要对 Java NIO 有个基本的了解 ,熟悉 Channel 、 ByteBuffer 、Selector 等基本概念。 对于 Java 网络编程经验不多的读者,可以试着先用 JavaNIO 的基本类写一个简单的 Client/Server 程序,然后再用 Netty 对比着实现一遍,这样比较容易理解 Netty 里各种组件存在的原因 。

13.2 Netty 架构总览

        Netty 主要分为三部分: 一是底层的零拷贝技术和统一通信模型;二是基于 JVM 实现的传输层; 三是常用协议支持。

        13.2.1 重新实现 Byte Buffer 

        在网络通信中, CPU 处理数据的速度大大快于网络传输数据的速度,所以需要引入缓冲区,将网络传输的数据放入缓冲区,累积足够的数据再发给 CPU处理。

        Netty即使用自己重新实现的 bufferAPI ,而不是使用 NIO 的 ByteBuffer 来表示一个连续的字节序列 。 新实现的 buffer 类型 ByteBuf 可以从底层解决 ByteBuffer的一些问题,是一种更适合日常网络应用开发需要的缓存类型 。 重新实现的 ByteBuf 特性包括允许 使用自定义的缓存类型、透明的零拷贝实现、比ByteBuffer 更快的响应速度等。

        字节缓存在网络通信中会被频繁地使用, ByteBuf 实现的是一个非常轻量级的字节数组包装器。 ByteBuf 有读操作和写操作,为了便于用户使用,该缓冲 区维护了读索引和写索引 。 ByteBuf 由 三个片段构成:废弃段、可读段和可写段。 其中,可读段表示缓冲区实际存储的可用数据。 当用户使用 read 或者skip 方法时,将会增加读索引 。 读索引之前的数据将进入废弃段,表示该数据已被使用过了 。 此外, 用户可主动使用 discardReadBytes 清空废弃段以便得到更多的可写空间 。 简单来说和 ByteBuffer 相比, ByteBuf 用在网络编程时更合适,更易用 。

        13.2.2 统一的异步 I/O 接口

         传统的 Java I/O API 在应对不同的传输协议时需要使用不同的类型和方法 。例如 java.net.Socket 和 java.net.DatagramSocket ,但它们没有相同的父类型,因此需要使用不同的调用方式执行 Socket 操作 。 因为在模式上不匹配,所以更换网络应用的传输协议时工作会变得很繁杂。 由于( Java I/O API) 缺乏协议间的可移植性,无法在不修改网络传输层的前提下增加多种协议的支持。 从理论上讲,多种应用层协议可运行在多种传输层协议之上,例如 TCP/IP 、 UDP/IP 、SCTP 和串口通信。

        还有个复杂的情况是, Java 的新 I/O (NIO) API 与原有的阻塞式 I/O (OIO) API 不兼容。 这两者无论是在设计上还是在性能上,其特性都不相同,可是在开发时一般只选择某一种 API 。 例如,在用户数较小的时候可以选择使用传统的 OIO (Old I/O)API,毕竟与 NIO 相比使用 OIO 更加容易;但是当业务快速增长,服务器需要同时处理成千上万的客户连接时间题就来了,这时候不得不尝试使用 NIO 来解决,新的 NIO Selector 编程接口和 Old I/O 差别很大,很难做到快速升级。

        Netty 有一个被称为 Channel 的统一异步 I / O 编程接口,这个编程接口抽象了所有点对点的通信操作 。 这样,如果应用是基于 Netty 的某一种传输方式来实现的,则可以快速迁移到另一种传输实现上。 Netty 提供了几种拥有相同编程接口的基本传输实现:

  • 基于 NIO 的 TCP/ IP 传输( io.netty.channel.nio ); 
  • 基于 OIO 的 TCP/ IP 传输( io.netty.channel.oio ); 
  • 基于 OIO 的 UDP/ IP 传输( io.netty.channel.oio); 
  • 本地传输( io.netty. channel.local ) 。

        切换不同的传输实现通常只需修改几行代码,而且由于核心 API 具有高度的可扩展性,很容易定制自己的传输实现。

        13.2.3 基于拦截链模式的事件模型

        一个定义良好并具有扩展能力的事件模型可以大大提高事件驱动程序的效率, Netty 就具有定义良好的 1/0 事件模型,它采用严格的层次结构来区分不同的事件类型, Netty 也允许在不破坏现有代码的情况下实现自己的事件类型 。 事件模型是 Netty 的一个亮 点,很多 NIO 通信框架没有或者仅有有限的事件模型概念,当需要一个新的事件类型的时候常常需要修改已有的代码,有的甚至不允许进行自定义的扩展。

        在 Netty 中, ChannelPipeline 内部的一个 ChannelEvent 被一组 ChannelHandler处理。 这个管道是 Intercepting Filter( 拦截过滤器)模式的一种高级形式的实现,因此对于一个事件如何被处理,以及管道内部处理器间的交互过程,用户拥有绝对的控制力 。 

        13.2.4 高级组件 

         Netty 提供了一系列的高级组件来让开发过程更加快捷,比如 Codec 框架、SSL/TLS 支持 、 HTTP 实现等 。

        首先看看 Codec 框架。 从业务逻辑代码中分离协议处理部分可以让代码结构变得更清晰,但是如果从零开始实现会有很高的复杂性,比如处理分段消息,相互叠加的多层协议,还有些协议复杂到无法在一台 独立的状态机上实现。 Netty 提供了一组构建在其核心模块之上的 codec 实现,是一种可扩展 、 可重用、可单元测试,并且是多层的 codec 框架,为用户提供容易维护的 codec代码。

        Netty 还提供对 SSL/TLS 的支持,不同于传统阻塞式的 IIO 实现,在 NIO模式下支持 SSL 功能不能只是简单地包装一下流数据并进行加密或解密工作,还需要借助于 javax.net.ssl.SSLEngine 。 SSLEngine 是一个有状态的实现,使用 SSLEngine 必须管理所有可能的状态,例如密码套件 、 密钥协商(或重新协商)、证书交换以及认证等,而且 SSLEngine 不是一个绝对的线程安全实现。 在Netty 内部, SslHandler 封装了所有艰难的细节,以及使用 SSLEngine 可能带来的陷阱。 用户只需要配置并将该 SslHandler 插入你的 ChannelPipeline 中即可,而且 Netty 允许实现像 StartTIS 那样的高级特性。

        HTTP 是互联网上最受欢迎的协议,与现有的 HTTP 实现相比, Netty 的HTTP 实现是相当与众不同的 。 在 HTTP 消息的低层交互过程中用户拥有绝对的控制力,因为 Netty 的 HTTP 实现只是一些 HTTP Codec 和 HTTP 消息类的简单组合,不存在任何限制,例如那种被迫选择的线程模型 。 用户可以根据自己的需求编写那种可以完全按照你期望的工作方式工作的客户端或服务器端代码,比如线程模型、连接生命期 、 快编码等。 基于这种高度可定制化的特性,用户可以开发一个非常高效的 HTTP 服务器 ,例如要求持久化链接以及服务器端推送技术的聊天服务,需要保持链接直至整个文件下载完成的媒体流服务,需要上传大文件并且没有 内 存压力的文件服务 , 支持大规模混合客户 端应用用于连接以万计的第三方异步 web 服务等 。

        Netty 的 WebSockets 实现, Web Sockets 允许双向 ,全双工通信信道。 在TCP socket 中 ,它被设计为允许一个 Web 浏览器和 Web 服务器之间通过数据流交互。 Web Socket 协议已经被IETF 列为RFC6455 规范 ,并且 Netty 实现了RFC 6455 和一些老版本的规范 。

        此外 Netty 还支持 GoogleProtocolBuffer,GoogleProtocolBuffers 是快速实现一个高效的二进制协议的理想方案 。 通过使用 Protobuffincoder 和ProtobufDecoder ,我们可以把 GoogleProtocolBuffers 编译器( protoc )生成的消息类放入 Netty 的 codec 实现中 。

13.3 Netty 用法示例 

git@gitee.com:li-yingjiann/netty-source-code.git

         13.3.1 Discard 服务器 

        世上最简单的协议不是“ HelloWorld !”而是 DISCARD 服务器。 这个协议会抛弃任何收到的数据而不响应 。 实现 DISCARD 协议只需忽略所有收到的数据。 我们从 Handler ( 处理器) 的实现开始 , Handler 是由 Netty 生成用来处理I/O 事件的 

public class DiscardServerHandler extends SimpleChannelInboundHandler<Object> {@Overridepublic void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {// discard}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}

        DiscardServerHandler 继承自 ChannellnboundHandlerAdapter ,这个类实现了 ChannellnboundHandler 接口, ChannellnboundHandler 提供了许多事件处理的接口方法,我们可以覆盖这些方法。 只 需要继承 Channellnbound­Handler Adapter 类而不用自己去实现接 口方法 。

        这里我们覆盖 了 chanelRead () 事件处理方法。每当 从 客 户端收到新的数据时,这个方法会 在收到消息时被调用 , 这个例 子中 , 收到的消息类型是ByteBuf。

        为了实现 DISCARD 协议,处理器不得不忽略所有接收到的消息。 ByteBuf是一个引用计数对象 ,这个对象必须显式地调用 release ()方法来释放。 注意处理器的职责是释放所有传递到处理器的引用计数对象。 

// io.netty.channel.SimpleChannelInboundHandler#channelRead@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {boolean release = true;try {//检查msg类型是否与泛型匹配if (acceptInboundMessage(msg)) {@SuppressWarnings("unchecked")I imsg = (I) msg;//ChannelRead0只有在类型匹配时执行,因为添加在String解码器前所以是不会执行的channelRead0(ctx, imsg);} else {release = false;ctx.fireChannelRead(msg);}} finally {if (autoRelease && release) {ReferenceCountUtil.release(msg);}}}

        在出现 Throwable 对象 ,即当 Netty 由于 IO 错误或者处理器在处理事件抛出异常时, exception Caught()事件处理方法会被调用 。 在大部分情况下,捕获的异常应该被记录下来并且把关联的 Channel 关闭掉。 通常在遇到不同的异常情况下会实现不同的处理方法,比如可能想在关闭连接之前发送一个错误码的响应消息 。

// io.netty.example.discard.DiscardServer
/*** Discards any incoming data.*/
public final class DiscardServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}EventLoopGroup bossGroup = new NioEventLoopGroup(1);                            // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();                                          // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)                                             // (3).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {                                        // (4)ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG,128)                                         // (5).childOption(ChannelOption.SO_KEEPALIVE,true);                                // (6)// Bind and start to accept incoming connections.ChannelFuture f = b.bind(PORT).sync();                                              // (7)// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}

       NioEventLoopGroup 是用来处理 I/O 操作的多线程事件循环器, Netty 提供了许多不同的 EventLoopGroup 的实现来处理不同的传输类型。 在这个例子中我们实现了一个服务端的应用,因此会有 2 个 NioEventLoopGroup 被使用 。 第一个经常被叫做“ boss ”,用来接收进来的连接; 第二个经常被 叫做“ worker",用来处理已经被接收的连接。-旦“ boss ”接收到连接,就会把连接信息注册到“ worker ,, 上 。 如何知道 多少个线程已经被使用,如何映射到已经创建的Channel 上都需要依赖于 EventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系 。

        ServerBootstrap 是一个启动 NIO 服务的辅助启动类 。 可以在这个服务中直接使用 Channel ,但处理过程比较复杂,一般不需要这样做。代码中我们指定使用 NioServerSocketChannel 类来说明 一个新的 Channel如何接收进来的连接。

        这里的事件处理类经常会被用来处理一个最近的已经接收的 Channel 。Channellnitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel 。 我们可以通过增加一些处理类比如 DiscardServerHandler 来配置一个新的 Channel 或者其对应的 Channe!Pipeline 来实现网络程序。 当网络程序变得复杂时,可以增加更多的处理类到 pip line 上,然后提取这些匿名类到最顶层的类上 。
        可以设置代码中指定的 Channel 的配置参数,这是一个 TCP/IP 的服务端程序,因此我们要设置 Socket 的参数选项比如 tcpNoDelay 和 keepAlive 。详细内容可以参考 Channel Option 和 Channe!Config 实现的接口文档,来对Channel Option 有一个大致的认识。

        option()是提供给 NioServerSocketChannel 用来接收进来的连接。childOption()是提供给由父管道 ServerChannel 接收到的连接,在这个例子中也就是 NioServerSocketChannel 。

剩下的就是绑定端口然后启动服务。 这里是绑定了机器所有网卡上的 8080端口 。 现在也可以多次调用 bind ()方法来绑定不同的地址 。

        13.3.2 查看收到的数据

         上一节我们已经编写了 Discard 服务端,现在需要测试一下它是否真的可以运行。 最简单的测试方法是使用 telnet 命令。 例如可以在命令行上输入 telnet localhost 8080 或者其他类型参数。 但是我们不能确定这个服务端是否正常运行,因为它是一个 Discard 服务,没法得到任何响应。 为了证明程序仍然在正常工作,我们需要修改服务端的程序来打印出它到底接收到了什么 。我们已经知道 channe!Read ()方法是在数据被接收的时候调用 。 让我们在DiscardServerHandler 类的 channelRead ()方法里添加一些代码

    @Overridepublic void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf) msg;while (in.isReadable()) {
//            System.out.println((char)in.readByte());System.out.println(in.toString(CharsetUtil.US_ASCII));System.out.flush();}}

        可以在这里调用 in.release(),如果 再次运行 telnet 命令,我们就能看到服务端会打印出它所接收到的消息 。

13.4 RocketMQ 基于 Netty 的通信功能实现

        RocketMQ 底层通信的 实 现是在 Remoting 模块里,因为借助了 Netty ,RocketMQ 的通信部分没有很多的代码,就是用 Netty 实现了 一个自定义协议的客户端 / 服务器程序 。

        13.4.1 顶层抽象类 

        RocketMQ 通信模块的顶层结构是 RemotingServer 和 RemotingClient ,分别对应通信的服务端和 客户端。 首先看看 RemotingServer

         RemotingServer 类 中比 较重要 的是 : localListenPort 、 registerProcessor 和registerDefaultProcessor , registerDefaultProcessor 用 来设置接 收到消息后的处理方法。

        RemotingClient 类 和 RemotingServer 类 相对应, 比较重要的方法是updateNameServer AddressList 、 invokeSync 和 invokeOneway , updateName-ServerAddressList 用来获取有效 的 NameServer 地址 , inv okeSync 与 invokeOneway用来向 Server 端发送请求,

        13.4.2 自定义协议

        NettyRemotingServer 和 NettyRemotingClient 分别实现了 RemotingServer与 RemotingClient 这两个接口,但它们有很多共有的内容,比如 invokeSync 、invokeOnew町 等 ,所以这些共有函数被提取到 NettyRemotingAbstract 共同继承的父类中 。 首先来分析一下在 NettyRemotingAbstract 中是如何处理接收到的内容的

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand

无论是服务端还是客户端都需要处理接收到的请求,处理方法由processRequestCommand 定义,注意这里接收到的消息已 经被转换成 Remoting­Command 了,而不是原始的字节流。

        RemotingCommand 是 RocketMQ 自定义的协议

        这个协议只有四部分,但是覆盖了 RocketMQ 各个角色间几乎所有的通信过程, RemotingCommand 有实际的数据类型 和各部分对应

        RocketMQ 各个组件间的通信需要频繁地在字节码和 RemotingCommand 间相互转换,也就是编码、解码过程,好在 Netty 提供了 codec 支持,这个频繁的操作只需要一行设置就可以完成 : pipeline()addLast ( new NettyEncoder(), new NettyDecoder() );

        同步发送消息的实现

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl

        函数的 RemotingCommand 来自对要发送消息的封装,输入参数 Channel来自 io.netty.channel 。 Channel 是通信的入口, Channel 对象的获取,对于服务端和客户端来说差别很大。 对客户端来说,由于是主动获取消息的一方 , 需要向哪个地址发送消息,于是通过 Netty 的 Bootstrap 方法创建一个连接( 同时把连接后的 Channel 保存起来,避免每发一个消息都重新创建连接);对服务端来说,很少主动发送消息,服务端一直在监听某个端口,当有一个连接请求进入后,服务端会把创建的 Channel 对象保存下来,供偶尔需要向客户端主动发消息的时候使用 。 

        13.4.3 基于 Netty 的 Server 和 Client

        基于 Netty 实 现的 Server 或 Client 程序, 具体代码在 NettyRemotingServer和 NettyRemotingClient 这两个 类 中,我们从 erverBootstrap 的初始化来 看RocketMQ 是如何基于 Netty 实现 Server 端程序的

org.apache.rocketmq.remoting.netty.NettyRemotingServer#start

        ServerBootStrap 的 BossEventLoop 使用的 是单线程 的 NioEventLoopGroup,workerEventLoop 在 Linux 平台使用的是默认 3 个线程的 EpollEventLoopGroup,在非 Linux 平台使用的 是 3 个 线程 的 NioEventLoopGroup 。 在 最 后几行代码中还可以 看 到添加了 Netty Encoder 和 Netty Decoder 这 两个 Handler 。 这些Handler 执行在一个 8 线程的 DefaultEventExecutorGroup 中 。

        RocketMQ 对通信过程的另一个抽象是 Processor 和 Executor ,当接收到一个消息后 , 直接根据消息的类型调用对应的 Processor 和 Executor ,把通信过程和业务逻辑分离开来。 我们通过一个 Broker 中的代码段来看看注册 Processor的过程

org.apache.rocketmq.broker.BrokerController#registerProcessor

        可以看出通过 RocketMQ 所做的抽象、 通信逻辑和信息处理逻辑被分离开 , 使结构变得非常清晰。