> 文章列表 > [Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

  • Pipeline相当于Logic Chain, 逻辑链
  • ChannelHandler相当于Logic, 逻辑
  • ChannelHandlerContext相当于逻辑节点, 封装了ChannelHandler

文章目录

      • 1.Q
      • 2.Pipeline过程分析
        • 2.1 pipeline的初始化
        • 2.2 添加ChannelHandler
        • 2.3 删除ChannelHandler
      • 3.Pipeline如何发起一个channel传递

1.Q

  1. netty如何判断channelHandler类型的
    [Netty源码] Pipeline相关问题 (八)
    通过看实现了什么样的接口去判断类型, ChannelInboundHandler和ChannelOutboundHandler
  2. 对于channelHandler的添加遵循什么顺序
    [Netty源码] Pipeline相关问题 (八)
    inBound是顺序, outBound是逆序

2.Pipeline过程分析

  1. pipeline的初始化
  2. 添加删除ChannelHandler
  3. 事件和异常传播

2.1 pipeline的初始化

  1. pipeline在创建channel时候被创建
  2. ChannelHandlerContext接口
  3. head和tail两大哨兵

pipeline在创建channel时候被创建

NioServerSocketChannel() -> super() -> super() -> super() -> AbstractChannel()

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

初始化节点 head和tail两大哨兵

TailContext: 主要做收尾的工作

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

如果没有读取数据, 抛出异常

[Netty源码] Pipeline相关问题 (八)

HeadContext

[Netty源码] Pipeline相关问题 (八)

pipeline节点数据结构: ChannelHandlerContext

[Netty源码] Pipeline相关问题 (八)

有传播读和传播写的功能, 属性存储的功能

[Netty源码] Pipeline相关问题 (八)

主要通过AbstractChannelHandlerContext实现ChannelHandlerContext接口

[Netty源码] Pipeline相关问题 (八)

AbstractChannelHandlerContext存在next和prev来连接

2.2 添加ChannelHandler

  1. 判断是否重复添加
  2. 创建节点添加链表
  3. 回调添加完成事件

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

    @Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {// 检查是否重复添加checkMultiplicity(handler);// 封装节点newCtx = newContext(group, filterName(name, handler), handler);// 添加节点addLast0(newCtx);if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}callHandlerAdded0(newCtx);return this;}

判断节点是否被重复添加

    private static void checkMultiplicity(ChannelHandler handler) {// 判断是否是ChannelHandlerAdapterif (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;// 判断如果是非共享, 且被添加过 抛出异常if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}public boolean isSharable() {Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {// 判断是否注解有@Sharable, 有则是可共享的sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}

创建节点添加链表

创建节点信息, 封装为ChannelHandlerContext

    newCtx = newContext(group, filterName(name, handler), handler);private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, handler.getClass());this.handler = handler;}AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class<? extends ChannelHandler> handlerClass) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.executionMask = mask(handlerClass);// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered = executor == null || executor instanceof OrderedEventExecutor;}

添加节点至Pipeline链表

    private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}

回调添加完成事件

[Netty源码] Pipeline相关问题 (八)

private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {newCtx.setAddPending();executor.execute(new Runnable() {@Overridepublic void run() {callHandlerAdded0(newCtx);}});}

如果不在当前线程的话丢入到Mscqueue, 如果在的话就直接执行

事件回调:

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {boolean removed = false;try {remove0(ctx);ctx.callHandlerRemoved();removed = true;} catch (Throwable t2) {if (logger.isWarnEnabled()) {logger.warn("Failed to remove a handler: " + ctx.name(), t2);}}if (removed) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; removed.", t));} else {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; also failed to remove.", t));}}}final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);}}

ChannelInitilizer.handlerAdded()

    @Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {removeState(ctx);}}}private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).// We do so to prevent multiple calls to initChannel(...).exceptionCaught(ctx, cause);} finally {ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this);}}return true;}return false;}

最终调用到initChannel(), 用户代码自定义的initChannel()。

2.3 删除ChannelHandler

权限校验场景下删除

[Netty源码] Pipeline相关问题 (八)

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {if (pass(msg)){ctx.pipeline().remove(this);}else {ctx.close();}}private boolean pass(ByteBuf password){return false;}
}

密码通过的话删除handler, 没通过的话关闭连接

  1. 得到需要删除的节点
  2. 删除节点
  3. 回调事件结果

[Netty源码] Pipeline相关问题 (八)

得到需要删除的节点

    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);if (ctx == null) {throw new NoSuchElementException(handler.getClass().getName());} else {return ctx;}}// 得到删除的节点@Overridepublic final ChannelHandlerContext context(ChannelHandler handler) {if (handler == null) {throw new NullPointerException("handler");}AbstractChannelHandlerContext ctx = head.next;for (;;) {if (ctx == null) {return null;}if (ctx.handler() == handler) {return ctx;}ctx = ctx.next;}}

删除节点

    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {assert ctx != head && ctx != tail;synchronized (this) {remove0(ctx);if (!registered) {callHandlerCallbackLater(ctx, false);return ctx;}EventExecutor executor = ctx.executor();if (!executor.inEventLoop()) {executor.execute(new Runnable() {@Overridepublic void run() {callHandlerRemoved0(ctx);}});return ctx;}}callHandlerRemoved0(ctx);return ctx;}private static void remove0(AbstractChannelHandlerContext ctx) {AbstractChannelHandlerContext prev = ctx.prev;AbstractChannelHandlerContext next = ctx.next;prev.next = next;next.prev = prev;}

删除节点的过程就是链表的删除的过程

回调事件结果

    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {// Notify the complete removal.try {ctx.callHandlerRemoved();} catch (Throwable t) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));}}final void callHandlerRemoved() throws Exception {try {// Only call handlerRemoved(...) if we called handlerAdded(...) before.if (handlerState == ADD_COMPLETE) {handler().handlerRemoved(this);}} finally {// Mark the handler as removed in any case.setRemoved();}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {initMap.remove(ctx);}

3.Pipeline如何发起一个channel传递

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

ChannelHandlerContext有一个抽象实现AbstractChannelHandlerContext, AbstractChannelHandlerContext默认的实现是DefaultChannelHandlerContext, DefaultChannelHandlerContext中存在一个handler的成员变量, 是上图包装的handler

handler的事件传播为链式调用, 靠ChannelHandlerContext实现完成, 首先pipeline存在很多fireChannelXXX方法, 这些方法分别对应着每一个事件从head节点的handler事件链式调用。

[Netty源码] Pipeline相关问题 (八)

以fireChannelRegistered()为例

HeadContext.channelRegistered()

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

[Netty源码] Pipeline相关问题 (八)

从入参指定的handler到下一个handler的回调执行事件

[Netty源码] Pipeline相关问题 (八)