[Netty源码] Pipeline相关问题 (八)
![[Netty源码] Pipeline相关问题 (八)](http://pic.ttrar.cn/nice/%5bNetty%e6%ba%90%e7%a0%81%5dPi.jpg)
- Pipeline相当于Logic Chain, 逻辑链
- ChannelHandler相当于Logic, 逻辑
- ChannelHandlerContext相当于逻辑节点, 封装了ChannelHandler
文章目录
-
-
- 1.Q
- 2.Pipeline过程分析
-
- 2.1 pipeline的初始化
- 2.2 添加ChannelHandler
- 2.3 删除ChannelHandler
- 3.Pipeline如何发起一个channel传递
-
1.Q
- netty如何判断channelHandler类型的
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/aa4f3a4395e04246a0f9a563d7f99f1b.png)
通过看实现了什么样的接口去判断类型, ChannelInboundHandler和ChannelOutboundHandler - 对于channelHandler的添加遵循什么顺序
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/175f6b91ed2745d8b634f004f34f0f2b.png)
inBound是顺序, outBound是逆序
2.Pipeline过程分析
- pipeline的初始化
- 添加删除ChannelHandler
- 事件和异常传播
2.1 pipeline的初始化
- pipeline在创建channel时候被创建
- ChannelHandlerContext接口
- head和tail两大哨兵
pipeline在创建channel时候被创建
NioServerSocketChannel() -> super() -> super() -> super() -> AbstractChannel()
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/c58b569291bb4a029adfbd6792c7f96b.png)
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/c748ac4dc0164724bfead423f383e9f3.png)
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/327c008f633c4973a3c7bc53d3736694.png)
初始化节点 head和tail两大哨兵
TailContext: 主要做收尾的工作
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/400003e245ef4c668f2db61557cbe8cc.png)
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/bab6985db5e149cb8452168ac698a3f4.png)
如果没有读取数据, 抛出异常
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/519deb06c573444592f918bc2e97acdf.png)
HeadContext
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/85f9a2b11c374e4da65eff0d89cca789.png)
pipeline节点数据结构: ChannelHandlerContext
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/becf5dee91354dada88979169cf5af7b.png)
有传播读和传播写的功能, 属性存储的功能
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/3a40ff3897f64820b6a530f34c01e754.png)
主要通过AbstractChannelHandlerContext实现ChannelHandlerContext接口
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/41a0015815f94d7fb0a34310dbcf0e71.png)
AbstractChannelHandlerContext存在next和prev来连接
2.2 添加ChannelHandler
- 判断是否重复添加
- 创建节点添加链表
- 回调添加完成事件
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/5af13a313d344b46910e6b9ac6a4156b.png)
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/6c5c62089280465380ad99a8036a2917.png)
@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {// 检查是否重复添加checkMultiplicity(handler);// 封装节点newCtx = newContext(group, filterName(name, handler), handler);// 添加节点addLast0(newCtx);if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}callHandlerAdded0(newCtx);return this;}
判断节点是否被重复添加
private static void checkMultiplicity(ChannelHandler handler) {// 判断是否是ChannelHandlerAdapterif (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;// 判断如果是非共享, 且被添加过 抛出异常if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}public boolean isSharable() {Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {// 判断是否注解有@Sharable, 有则是可共享的sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}
创建节点添加链表
创建节点信息, 封装为ChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, handler.getClass());this.handler = handler;}AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class<? extends ChannelHandler> handlerClass) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.executionMask = mask(handlerClass);// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered = executor == null || executor instanceof OrderedEventExecutor;}
添加节点至Pipeline链表
private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}
回调添加完成事件
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/d5572ef8354746e2a17c2be3ab8d1f2a.png)
private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {newCtx.setAddPending();executor.execute(new Runnable() {@Overridepublic void run() {callHandlerAdded0(newCtx);}});}
如果不在当前线程的话丢入到Mscqueue, 如果在的话就直接执行
事件回调:
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {boolean removed = false;try {remove0(ctx);ctx.callHandlerRemoved();removed = true;} catch (Throwable t2) {if (logger.isWarnEnabled()) {logger.warn("Failed to remove a handler: " + ctx.name(), t2);}}if (removed) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; removed.", t));} else {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; also failed to remove.", t));}}}final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);}}
ChannelInitilizer.handlerAdded()
@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {removeState(ctx);}}}private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).// We do so to prevent multiple calls to initChannel(...).exceptionCaught(ctx, cause);} finally {ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this);}}return true;}return false;}
最终调用到initChannel(), 用户代码自定义的initChannel()。
2.3 删除ChannelHandler
权限校验场景下删除
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/151a725932b04584be5d7e72207906a8.png)
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {if (pass(msg)){ctx.pipeline().remove(this);}else {ctx.close();}}private boolean pass(ByteBuf password){return false;}
}
密码通过的话删除handler, 没通过的话关闭连接
- 得到需要删除的节点
- 删除节点
- 回调事件结果
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/a55311e745cf4f46b4d7433ff86cfc10.png)
得到需要删除的节点
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);if (ctx == null) {throw new NoSuchElementException(handler.getClass().getName());} else {return ctx;}}// 得到删除的节点@Overridepublic final ChannelHandlerContext context(ChannelHandler handler) {if (handler == null) {throw new NullPointerException("handler");}AbstractChannelHandlerContext ctx = head.next;for (;;) {if (ctx == null) {return null;}if (ctx.handler() == handler) {return ctx;}ctx = ctx.next;}}
删除节点
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {assert ctx != head && ctx != tail;synchronized (this) {remove0(ctx);if (!registered) {callHandlerCallbackLater(ctx, false);return ctx;}EventExecutor executor = ctx.executor();if (!executor.inEventLoop()) {executor.execute(new Runnable() {@Overridepublic void run() {callHandlerRemoved0(ctx);}});return ctx;}}callHandlerRemoved0(ctx);return ctx;}private static void remove0(AbstractChannelHandlerContext ctx) {AbstractChannelHandlerContext prev = ctx.prev;AbstractChannelHandlerContext next = ctx.next;prev.next = next;next.prev = prev;}
删除节点的过程就是链表的删除的过程
回调事件结果
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {// Notify the complete removal.try {ctx.callHandlerRemoved();} catch (Throwable t) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));}}final void callHandlerRemoved() throws Exception {try {// Only call handlerRemoved(...) if we called handlerAdded(...) before.if (handlerState == ADD_COMPLETE) {handler().handlerRemoved(this);}} finally {// Mark the handler as removed in any case.setRemoved();}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {initMap.remove(ctx);}
3.Pipeline如何发起一个channel传递
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/91689b7a3c394665a35dcb1d6f8f615a.png)
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/de9f96febc024da38eead76470ea9327.png)
ChannelHandlerContext有一个抽象实现AbstractChannelHandlerContext, AbstractChannelHandlerContext默认的实现是DefaultChannelHandlerContext, DefaultChannelHandlerContext中存在一个handler的成员变量, 是上图包装的handler
handler的事件传播为链式调用, 靠ChannelHandlerContext实现完成, 首先pipeline存在很多fireChannelXXX方法, 这些方法分别对应着每一个事件从head节点的handler事件链式调用。
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/5c7990f943b24fe282823cf6c2e9971c.png)
以fireChannelRegistered()为例
HeadContext.channelRegistered()
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/2d3173ed963f4be5aa33b6200886b311.png)
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/23b97b0163114b1fab8883372c60097a.png)
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/25489d952a7643838087fad7897664e7.png)
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/c1afd0de3d2643c293a663bfb5909153.png)
从入参指定的handler到下一个handler的回调执行事件
![[Netty源码] Pipeline相关问题 (八)](https://img-blog.csdnimg.cn/7968c87a82ba402281ef4e1d220d63cb.png)


