[Netty源码] Pipeline相关问题 (八)
- Pipeline相当于Logic Chain, 逻辑链
- ChannelHandler相当于Logic, 逻辑
- ChannelHandlerContext相当于逻辑节点, 封装了ChannelHandler
文章目录
-
-
- 1.Q
- 2.Pipeline过程分析
-
- 2.1 pipeline的初始化
- 2.2 添加ChannelHandler
- 2.3 删除ChannelHandler
- 3.Pipeline如何发起一个channel传递
-
1.Q
- netty如何判断channelHandler类型的
通过看实现了什么样的接口去判断类型, ChannelInboundHandler和ChannelOutboundHandler - 对于channelHandler的添加遵循什么顺序
inBound是顺序, outBound是逆序
2.Pipeline过程分析
- pipeline的初始化
- 添加删除ChannelHandler
- 事件和异常传播
2.1 pipeline的初始化
- pipeline在创建channel时候被创建
- ChannelHandlerContext接口
- head和tail两大哨兵
pipeline在创建channel时候被创建
NioServerSocketChannel() -> super() -> super() -> super() -> AbstractChannel()
初始化节点 head和tail两大哨兵
TailContext: 主要做收尾的工作
如果没有读取数据, 抛出异常
HeadContext
pipeline节点数据结构: ChannelHandlerContext
有传播读和传播写的功能, 属性存储的功能
主要通过AbstractChannelHandlerContext实现ChannelHandlerContext接口
AbstractChannelHandlerContext存在next和prev来连接
2.2 添加ChannelHandler
- 判断是否重复添加
- 创建节点添加链表
- 回调添加完成事件
@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {// 检查是否重复添加checkMultiplicity(handler);// 封装节点newCtx = newContext(group, filterName(name, handler), handler);// 添加节点addLast0(newCtx);if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}callHandlerAdded0(newCtx);return this;}
判断节点是否被重复添加
private static void checkMultiplicity(ChannelHandler handler) {// 判断是否是ChannelHandlerAdapterif (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;// 判断如果是非共享, 且被添加过 抛出异常if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}public boolean isSharable() {Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {// 判断是否注解有@Sharable, 有则是可共享的sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}
创建节点添加链表
创建节点信息, 封装为ChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, handler.getClass());this.handler = handler;}AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class<? extends ChannelHandler> handlerClass) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.executionMask = mask(handlerClass);// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered = executor == null || executor instanceof OrderedEventExecutor;}
添加节点至Pipeline链表
private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}
回调添加完成事件
private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {newCtx.setAddPending();executor.execute(new Runnable() {@Overridepublic void run() {callHandlerAdded0(newCtx);}});}
如果不在当前线程的话丢入到Mscqueue, 如果在的话就直接执行
事件回调:
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {boolean removed = false;try {remove0(ctx);ctx.callHandlerRemoved();removed = true;} catch (Throwable t2) {if (logger.isWarnEnabled()) {logger.warn("Failed to remove a handler: " + ctx.name(), t2);}}if (removed) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; removed.", t));} else {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; also failed to remove.", t));}}}final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);}}
ChannelInitilizer.handlerAdded()
@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {removeState(ctx);}}}private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).// We do so to prevent multiple calls to initChannel(...).exceptionCaught(ctx, cause);} finally {ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this);}}return true;}return false;}
最终调用到initChannel(), 用户代码自定义的initChannel()。
2.3 删除ChannelHandler
权限校验场景下删除
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {if (pass(msg)){ctx.pipeline().remove(this);}else {ctx.close();}}private boolean pass(ByteBuf password){return false;}
}
密码通过的话删除handler, 没通过的话关闭连接
- 得到需要删除的节点
- 删除节点
- 回调事件结果
得到需要删除的节点
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);if (ctx == null) {throw new NoSuchElementException(handler.getClass().getName());} else {return ctx;}}// 得到删除的节点@Overridepublic final ChannelHandlerContext context(ChannelHandler handler) {if (handler == null) {throw new NullPointerException("handler");}AbstractChannelHandlerContext ctx = head.next;for (;;) {if (ctx == null) {return null;}if (ctx.handler() == handler) {return ctx;}ctx = ctx.next;}}
删除节点
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {assert ctx != head && ctx != tail;synchronized (this) {remove0(ctx);if (!registered) {callHandlerCallbackLater(ctx, false);return ctx;}EventExecutor executor = ctx.executor();if (!executor.inEventLoop()) {executor.execute(new Runnable() {@Overridepublic void run() {callHandlerRemoved0(ctx);}});return ctx;}}callHandlerRemoved0(ctx);return ctx;}private static void remove0(AbstractChannelHandlerContext ctx) {AbstractChannelHandlerContext prev = ctx.prev;AbstractChannelHandlerContext next = ctx.next;prev.next = next;next.prev = prev;}
删除节点的过程就是链表的删除的过程
回调事件结果
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {// Notify the complete removal.try {ctx.callHandlerRemoved();} catch (Throwable t) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));}}final void callHandlerRemoved() throws Exception {try {// Only call handlerRemoved(...) if we called handlerAdded(...) before.if (handlerState == ADD_COMPLETE) {handler().handlerRemoved(this);}} finally {// Mark the handler as removed in any case.setRemoved();}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {initMap.remove(ctx);}
3.Pipeline如何发起一个channel传递
ChannelHandlerContext有一个抽象实现AbstractChannelHandlerContext, AbstractChannelHandlerContext默认的实现是DefaultChannelHandlerContext, DefaultChannelHandlerContext中存在一个handler的成员变量, 是上图包装的handler
handler的事件传播为链式调用, 靠ChannelHandlerContext实现完成, 首先pipeline存在很多fireChannelXXX方法, 这些方法分别对应着每一个事件从head节点的handler事件链式调用。
以fireChannelRegistered()为例
HeadContext.channelRegistered()
从入参指定的handler到下一个handler的回调执行事件