> 文章列表 > Netty源码解读

Netty源码解读

Netty源码解读

Netty源码解读

Netty线程模型

在这里插入图片描述
1、定义了两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
2、BossGroup和WorkerGroup类型都是NioEventLoopGroup,Group中维护了多个事件循环线程NioEventLoop,每个NioEventLoop维护了一个Selector和TaskQueue
3、每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
3.1、处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
3.2、将NioSocketChannel注册到某个worker NIOEventLoop上的selector
3.3、runAllTasks处理任务队列TaskQueue的任务
4、 每个worker NioEventLoop线程循环执行的步骤
4.1、轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
4.2、处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
4.3、runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在pipeline中的流动处理
4.4、处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler处理器用来处理 channel 中的数据

Netty服务启动示例

// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//对workerGroup的SocketChannel设置handler处理器ch.pipeline().addLast(new NettyServerHandler());}});
// 启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(9099).sync();

Netty源码分析

从bootstrap.bind作为入口分析启动流程,进入后可以看到会调用AbstractBootstrap#doBind,最终会调用initAndRegister()方法,主要逻辑都在前三步中实现,本次也主要分析这三个步骤

# AbstractBootstrap类
// 1、创建一个服务端Channel,即NioServerSocketChannel
channel = channelFactory.newChannel();
// 2、初始化NioServerSocketChannel,在pipeline中添加一些处理器hander
init(channel);
// 3、进行注册
ChannelFuture regFuture = config().group().register(channel);
// 把NioServerSocketChannel绑定到指定端口
channel.bind(localAddress, promise);

channelFactory.newChannel();

bootstrap.channel(NioServerSocketChannel.class) 会将serverChannel绑定到ReflectiveChannelFactory上

# AbstractBootstrap类public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

channelFactory.newChannel()会调用ReflectiveChannelFactory的newChannel方法,进而调用constructor.newInstance(),而该constructor正好是NioServerSocketChannel类;所以new的对象就是NioServerSocketChannel
服务端NioServerSocketChannel进行初始化
1、设置感兴趣事件为连接事件OP_ACCEPT
2、设置channel为非阻塞
3、初始化服务端pipeline

# NioServerSocketChannel类public NioServerSocketChannel(ServerSocketChannel channel) {// 将感兴趣的事件设置为连接事件OP_ACCEPTsuper(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}// 父类初始化方法 ch 即为NioServerSocketChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;// 设置为非阻塞ch.configureBlocking(false);
}// 父类的父类中初始化pipeline,此时只有HeadContext和TailContext
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();
}

init(channel)

调用ServerBootstrap.init方法,向服务端NioServerSocketChannel的pipeline中添加hander处理器ChannelInitializer;此时服务端pipeline链表中的hander如下
在这里插入图片描述

# ServerBootstrap 类void init(Channel channel) throws Exception {ChannelPipeline p = channel.pipeline();//向 pipeline中添加hander处理器ChannelInitializerp.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}

config().group().register(channel)

bootstrap.group(bossGroup, workerGroup)构造时,将group设置为bossGroup,childGroup设置为workerGroup; config().group().register(channel)会调用bossGroup的register方法,从bossGroup的MultithreadEventLoopGroup线程组中取一个线程SingleThreadEventLoop进行调用register方法

register注册逻辑

服务端的NioServerSocketChannel和客户端的NioSocketChannel都会调用此方法进行注册
1、服务启动时,NioServerSocketChannel注册到selector上,对客户端OP_ACCEPT操作感兴趣
2、当有客户端连接时,通过NioServerSocketChannel的accept()得到每个客户端的NioSocketChannel,将其注册到selector上,对客户端OP_READ操作感兴趣

# SingleThreadEventLoop extends SingleThreadEventExecutor 类public ChannelFuture register(final ChannelPromise promise) {promise.channel().unsafe().register(this, promise);return promise;
}

调用AbstractChannel的register方法,创建一个注册的task交给EventLoop线程处理

# AbstractChannel 类public final void register(EventLoop eventLoop, final ChannelPromise promise) {.......AbstractChannel.this.eventLoop = eventLoop;.......// 1、处理连接事件时,用的是bossGroup里的NioEventLoop// 2、处理读写事件时,用的是workGroup里的NioEventLoopeventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});
}private void register0(ChannelPromise promise) {doRegister();// 1、NioServerSocketChannel 处理逻辑// 调用NioServerSocketChannel服务端pipeline中hander的handlerAdded方法// 此时会调用到ChannelInitializer的handlerAdded,然后调用其initChannel,该方法中// 会向服务端pipeline中加入ServerBootstrapAcceptor// 调用服务端pipeline中hander的channelRegistered方法// 调用服务端pipeline中hander的ChannelActive方法// 2、NioSocketChannel 处理逻辑 调用我们自定义hander中的方法// 调用客户端pipeline中hander的handlerAdded方法// 调用客户端pipeline中hander的channelRegistered方法// 调用客户端pipeline中hander的ChannelActive方法,我们自定义hander的ChannelActive在此调用pipeline.invokeHandlerAddedIfNeeded();pipeline.fireChannelRegistered();pipeline.fireChannelActive();}
// doRegister()逻辑由子类AbstractNioChannel实现
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 将channel注册到Selector上// 1、NioServerSocketChannel注册到Selector上// 2、NioSocketChannel注册到Selector上selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {}}
}

eventLoop.execute就是调用SingleThreadEventExecutor#execute

# SingleThreadEventExecutor 类
@Override
public void execute(Runnable task) {// 将注册register0逻辑加入队列taskQueueaddTask(task);// 开启线程循环监听事件,会调用SingleThreadEventExecutor.run方法// 最终调用子类NioEventLoop的run()方法startThread();
}

死循环执行 selector.select方法,直到监听到事件或者超时,才会执行processSelectedKeys逻辑
1、服务端启动后,NioServerSocketChannel若监听到客户端OP_ACCEPT操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环
2、客户端连接成功后,NioSocketChannel若监听到客户端OP_READ操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环

# NioEventLoop 类
@Override
protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {....case SelectStrategy.SELECT:// 该方法监听到事件(OP_ACCEPT|OP_READ)时才会返回select(wakenUp.getAndSet(false));default:}} catch (IOException e) {.....}// 监听到事件执行try {// 1、获取SelectionKey处理事件processSelectedKeys();} finally {// 2、执行taskQueue中其他的注册方法register0runAllTasks();}}}
}   private void select(boolean oldWakenUp) throws IOException {// 一直循环遍历int selectCnt = 0;for (;;) {// 根据注册的定时任务,获取本次select的阻塞时间long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;// 没有监听到事件或没有超时,则一直阻塞(会让出cpu资源)int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// 正常场景// 当有连接|读写操作或者selector被唤醒了,则直接返回break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// 正常场景// 说明没有监听到事件,而是超时了,则重置selectCntselectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 异常场景  select 空轮询bug修复// 若空轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD配置// 则关闭老的select,建立新的selectselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}
}private void processSelectedKeysOptimized() {// 遍历所有的selectedKeys进行处理for (int i = 0; i < selectedKeys.size; ++i) {processSelectedKey(k, (AbstractNioChannel) a);}
}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();int readyOps = k.readyOps();if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 连接|读写操作会调用该方法// 1、连接操作调用NioMessageUnsafe的read方法// 2、读写操作调用NioByteUnsafe的read方法unsafe.read();}
}

OP_ACCEPT连接操作处理
1、为每个客户端创建NioSocketChannel,并进行初始化
1.1、设置感兴趣事件为OP_READ
1.2、设置channel为非阻塞
1.3、初始化客户端pipeline
2、调用服务端NioServerSocketChannel的pipeline,将客户端的NioSocketChannel作为参数传过去,最终会调用到ServerBootstrapAcceptor,将NioSocketChannel注册到workGroup上

# NioMessageUnsafe 类
public void read() {final ChannelPipeline pipeline = pipeline();// 创建每个客户端的NioSocketChanneldoReadMessages(readBuf);int size = readBuf.size();// readBuf为NioSocketChannel,遍历客户端所有的NioSocketChannel// 执行服务端NioServerSocketChannel的pipeline,循环执行fireChannelRead,// 最终会调用服务端hander的ChannelRead方法,此处会调用到ServerBootstrapAcceptor的ChannelRead方法for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}// 调用服务端pipeline的读完成方法pipeline.fireChannelReadComplete();}protected abstract int doReadMessages(List<Object> buf) throws Exception;
// 调用子类NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {// 获取客户端的连接得到SocketChannel,每个客户端在服务端都有一个对应的SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// NioSocketChannel处理方式同NioServerSocketChannel// 1、设置感兴趣事件为连接事件OP_READ// 2、设置channel为非阻塞// 3、初始化客户端NioSocketChannel的pipelinebuf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {}return 0;
}

将我们自定义的hander添加到NioSocketChannel的pipeline上,然后将NioSocketChannel注册到workGroup上,此时客户端pipeline链表中的hander如下
在这里插入图片描述

# ServerBootstrapAcceptor 类public void channelRead(ChannelHandlerContext ctx, Object msg) {// 传过来的NioSocketChannelfinal Channel child = (Channel) msg;// 将我们手动添加的Hander添加到pipelinechild.pipeline().addLast(childHandler);try {// 将NioSocketChannel注册workGroup的一个线程的selector上,// 方式同NioServerSocketChannel,执行register注册逻辑childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {.....}});} catch (Throwable t) {}
}

OP_READ操作处理
进行数据读写,并执行pipeline中自定义的hander

# NioByteUnsafe类// 接受到客户端OP_READ事件时调用
public void read() {// 获取客户端NioSocketChannel的pipelinefinal ChannelPipeline pipeline = pipeline();do {// 数据读写// 调用pipeline.fireChannelRead时会依次调用pipeline中hander的ChannelRead方法// 我们自定义的hander的ChannelRead方法就会在此处调用byteBuf = allocHandle.allocate(allocator);allocHandle.lastBytesRead(doReadBytes(byteBuf));pipeline.fireChannelRead(byteBuf);} while (allocHandle.continueReading());allocHandle.readComplete();// 调用pipeline的读完成方法pipeline.fireChannelReadComplete();
}