Netty核心源码分析(二),Netty的Server端接收请求过程源码分析
文章目录
系列文章目录
Netty核心源码分析(一),Netty的Server端启动过程源码分析
Netty核心源码分析(二),Netty的Server端接收请求过程源码分析
一、连接请求接受过程源码分析
当Netty服务器启动之后,执行到NioEventLoop的run方法,这是一个死循环,不断地循环判断有没有事件发生:
// io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {// 关键方法processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}
1、事件的值
读、写、连接、accept事件,都是在SelectionKey中定义的,利用的是偏移量进行计算
// 1
public static final int OP_READ = 1 << 0;
// 4
public static final int OP_WRITE = 1 << 2;
// 8
public static final int OP_CONNECT = 1 << 3;
// 16
public static final int OP_ACCEPT = 1 << 4;
2、processSelectedKeys获取事件
// io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}
}// io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loop// 判断事件,selectKeys最终执行的方法if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}
当我们有客户端连接时,此时我们可以看到readyOps 的值是16,对应着ACCEPT事件也是16,此时会执行unsafe.read()方法。
NioMessageUnsafe是AbstractNioMessageChannel的一个内部类:
// io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {// 检查该eventLoop是否是当前线程assert eventLoop().inEventLoop();final ChannelConfig config = config(); // 拿到configfinal ChannelPipeline pipeline = pipeline(); // 拿到pipelinefinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// readBuf是一个list,此处循环执行该方法// doReadMessages是读取boss线程中的NioServerSocketChannel接收到的请求,并把这些请求放进readBufint localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 循环调用handler中的ChannelRead方法pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}
}
(1)doReadMessages方法
// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {// 实际上调用NIO的的accept方法,获取SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// 将NIO的SocketChannel包装成NioSocketChannelbuf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;
}
我们发现,doReadMessages实际上是调用了NIO的SocketChannel的accept方法,获取到NIO的SocketChannel。
然后使用Netty的NioSocketChannel对NIO的SocketChannel进行了包装,最后添加到buf容器中。
(2)pipeline的fireChannelRead方法
doReadMessages方法获取到NioSocketChannel之后,fireChannelRead将这些NioSocketChannel作为参数传入。
// io.netty.channel.DefaultChannelPipeline#fireChannelRead
@Override
public final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;
}
// io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}// io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}
}
我们发现,会调用channelHandler的channelRead方法,
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(), msg);return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}
我们发现,调用channelRead方法时,会调用ctx.fireChannelRead(msg); 将请求传递给pipeline中的下一个Handler,而在pipeline中,Handler是链式存储的。
而ServerBootstrapAcceptor是pipeline的最后一个Handler,它的channelRead方法是在ServerBootStrap父类中实现的,我们继续看ServerBootstrapAcceptor的channelRead方法。
注意!此处的Handler,并不是child的Handler!而是bossGroup中定义的Handler!
woekerGroup定义的Handler在children中!
(3)ServerBootstrapAcceptor的channelRead方法
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try { // 将客户端连接注册到worker线程池// 此处的childGroup,就是我们的workergroup,child就是NioSocketChannelchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
}
最终调用workGroup(childGroup)的register方法,将NioSocketChannel注册到workerGroup(childGroup)中去。
注意!workGroup是我们命名的,Netty称为childGroup
3、workerGroup的register过程
上面分析到,bossGroup拿到NioSocketChannel之后,会调用workerGroup(childGroup)的register方法进行注册:
// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {return next().register(channel);
}
next方法会轮询拿到workerGroup的下一个线程(workerGroup一般会创建多个线程,默认为cpu核心数*2),调用register方法将NioSocketChannel传入。
// io.netty.util.concurrent.DefaultEventExecutorChooserFactory.GenericEventExecutorChooser#next
@Override
public EventExecutor next() {// 取下一个执行器return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
我们继续看register方法:
// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {// 对Channel进一步包装return register(new DefaultChannelPromise(channel, this));
}
// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");// 真正的register的方法promise.channel().unsafe().register(this, promise);return promise;
}
// io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {// 进入到这里register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
register0方法中,执行了doRegister方法注册,并且判断Channel中有没有可能存在的任务,进行执行。
// io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;// 执行注册doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {// 第一次注册执行ChannelActive方法pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805// 开始读取beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}
}
4、beginRead方法
// io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
@Override
public final void beginRead() {assertEventLoop();if (!isActive()) {return;}try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireExceptionCaught(e);}});close(voidPromise());}
}// io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}
}
到这,Server端接收到Client端发送来的连接请求,并将请求注册到了workerGroup中,workerGroup开始接收数据了。
5、Netty接受请求过程源码小结
总体流程:接受连接---->创建一个新的 NioSocketChannel---------->注册到一个 worker EventLoop-------->注册 selecot Read 事件。
1)服务器轮询 Accept 事件,获取事件后调用 unsafe 的 read 方法,这个 unsafe 是 ServerSocket 的内部类。
2)doReadMessages 用于创建 NioSocketChannel 对象,该对象包装 JDK 的 Nio Channel 客户端。该方法会像创建 ServerSocketChanel 类似创建相关的 pipeline , unsafe,config。
3)随后执行 pipeline.fireChannelRead 方法,在ServerBootstrapAcceptor的channelRead方法中,调用workerGroup(childGroup)的register方法进行注册。
4)注册成功后,开始监听Read事件。