> 文章列表 > Netty详解,5分钟了解,面试不用慌

Netty详解,5分钟了解,面试不用慌

Netty详解,5分钟了解,面试不用慌

1. 概述

1.1 Netty 是什么?

Netty is an asynchronous event-driven network application framework

for rapid development of maintainable high performance protocol servers & clients.

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

1.2 Netty 的作者

他还是另一个著名网络应用框架 Mina 的重要贡献者

1.3 Netty 的地位

Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

以下的框架都使用了 Netty,因为它们有网络通信需求!

Cassandra - nosql 数据库

Spark - 大数据分布式计算框架

Hadoop - 大数据分布式存储框架

RocketMQ - ali 开源的消息队列

ElasticSearch - 搜索引擎

gRPC - rpc 框架

Dubbo - rpc 框架

Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端

Zookeeper - 分布式协调框架

1.4 Netty 的优势

Netty vs NIO,工作量大,bug 多

需要自己构建协议

解决 TCP 传输问题,如粘包、半包

epoll 空轮询导致 CPU 100%

对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

Netty vs 其它网络应用框架

Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀

久经考验,16年,Netty 版本

2.x 2004

3.x 2008

4.x 2013

5.x 已废弃(没有明显的性能提升,维护成本高)

2. Hello World

2.1 目标

开发一个简单的服务器端和客户端

客户端向服务器端发送 hello, world

服务器仅接收,不返回

加入依赖

<dependency>

    <groupId>io.netty</groupId>

    <artifactId>netty-all</artifactId>

    <version>4.1.39.Final</version>

</dependency>

Copy

2.2 服务器端

new ServerBootstrap()

    .group(new NioEventLoopGroup()) // 1

    .channel(NioServerSocketChannel.class) // 2

    .childHandler(new ChannelInitializer<NioSocketChannel>() { // 3

        protected void initChannel(NioSocketChannel ch) {

            ch.pipeline().addLast(new StringDecoder()); // 5

            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6

                @Override

                protected void channelRead0(ChannelHandlerContext ctx, String msg) {

                    System.out.println(msg);

                }

            });

        }

    })

    .bind(8080); // 4

Copy

代码解读

1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开

2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有

3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器

4 处,ServerSocketChannel 绑定的监听端口

5 处,SocketChannel 的处理器,解码 ByteBuf => String

6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果

2.3 客户端

new Bootstrap()

    .group(new NioEventLoopGroup()) // 1

    .channel(NioSocketChannel.class) // 2

    .handler(new ChannelInitializer<Channel>() { // 3

        @Override

        protected void initChannel(Channel ch) {

            ch.pipeline().addLast(new StringEncoder()); // 8

        }

    })

    .connect("127.0.0.1", 8080) // 4

    .sync() // 5

    .channel() // 6

    .writeAndFlush(new Date() + ": hello world!"); // 7

Copy

代码解读

1 处,创建 NioEventLoopGroup,同 Server

2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有

3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器

4 处,指定要连接的服务器和端口

5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕

6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作

7 处,写入消息并清空缓冲区

8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出

数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程

2.4 流程梳理

一开始需要树立正确的观念

把 channel 理解为数据的通道

把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf

把 handler 理解为数据的处理工序

工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成...)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)

handler 分 Inbound 和 Outbound 两类

把 eventLoop 理解为处理数据的工人

工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)

工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务

工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人

3. 组件

3.1 EventLoop

事件循环对象

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法

另一条线是继承自 netty 自己的 OrderedEventExecutor,

提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop

提供了 parent 方法来看看自己属于哪个 EventLoopGroup

事件循环组

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

继承自 netty 自己的 EventExecutorGroup

实现了 Iterable 接口提供遍历 EventLoop 的能力

另有 next 方法获取集合中下一个 EventLoop

以一个简单的实现为例:

// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程

DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);

System.out.println(group.next());

System.out.println(group.next());

System.out.println(group.next());

Copy

输出

io.netty.channel.DefaultEventLoop@60f82f98

io.netty.channel.DefaultEventLoop@35f983a6

io.netty.channel.DefaultEventLoop@60f82f98

也可以使用 for 循环

DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);

for (EventExecutor eventLoop : group) {

    System.out.println(eventLoop);

}

输出

io.netty.channel.DefaultEventLoop@60f82f98

io.netty.channel.DefaultEventLoop@35f983a6

💡 优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

演示 NioEventLoop 处理 io 事件

服务器端两个 nio worker 工人

new ServerBootstrap()

    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))

    .channel(NioServerSocketChannel.class)

    .childHandler(new ChannelInitializer<NioSocketChannel>() {

        @Override

        protected void initChannel(NioSocketChannel ch) {

            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {

                @Override

                public void channelRead(ChannelHandlerContext ctx, Object msg) {

                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;

                    if (byteBuf != null) {

                        byte[] buf = new byte[16];

                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());

                        log.debug(new String(buf));

                    }

                }

            });

        }

    }).bind(8080).sync();

Copy

客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)

public static void main(String[] args) throws InterruptedException {

    Channel channel = new Bootstrap()

            .group(new NioEventLoopGroup(1))

            .handler(new ChannelInitializer<NioSocketChannel>() {

                @Override

                protected void initChannel(NioSocketChannel ch) throws Exception {

                    System.out.println("init...");

                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

                }

            })

            .channel(NioSocketChannel.class).connect("localhost", 8080)

            .sync()

            .channel();

    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));

    Thread.sleep(2000);    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));

Copy

最后输出

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan      

22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan      

22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi          

22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi          

22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu       

22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu        

可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定

再增加两个非 nio 工人

DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);

new ServerBootstrap()

    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))

    .channel(NioServerSocketChannel.class)

    .childHandler(new ChannelInitializer<NioSocketChannel>() {

        @Override

        protected void initChannel(NioSocketChannel ch)  {

            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

            ch.pipeline().addLast(normalWorkers,"myhandler",

              new ChannelInboundHandlerAdapter() {

                @Override

                public void channelRead(ChannelHandlerContext ctx, Object msg) {

                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;

                    if (byteBuf != null) {

                        byte[] buf = new byte[16];

                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());

                        log.debug(new String(buf));

                    }

                }

            });

        }

    }).bind(8080).sync();