上面一篇文章介绍了 ChannelPipeline,它维护了一个有序的 ChannelHandler 列表。当 ChannelHandler 加入到 ChannelPipeline 的时候,会创建一个对应的 ChannelHandlerContext 并绑定,ChannelPipeline 实际维护的是和 ChannelHandlerContext 的关系,例如在 DefaultChannelPipeline:
1
2
3
4
5
6
7// public class DefaultChannelPipeline implements ChannelPipeline { //... final AbstractChannelHandlerContext head; // 头结点 final AbstractChannelHandlerContext tail; // 尾结点 }
DefaultChannelPipeline 会保存第一个ChannelHandlerContext以及最后一个ChannelHandlerContext的引用。而 AbstractChannelHandlerContext 中维护了 next 和 prev 指针:
1
2
3
4
5
6abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { //... volatile AbstractChannelHandlerContext next; // 前驱节点 volatile AbstractChannelHandlerContext prev; // 后置节点 }
这样 ChannelHandlerContext 之间形成了双向链表。
1.ChannelHandlerContext 方法
我们来看一下 ChannelHandlerContext 的继承关系:
它继承了 AttributeMap 用于存储信息,实现了 ChannelInboundInvoker 和 ChannelOutboundInvoker 可进行事件的传播。
PS:ChannelPipeline 和 ChannelHandlerContext 都同时继承了 ChannelInboundInvoker 和 ChannelOutboundInvoker接口
1.AttributeMap 接口的方法
1
2
3
4
5
6
7public interface AttributeMap { <T> Attribute<T> attr(AttributeKey<T> key); <T> boolean hasAttr(AttributeKey<T> key); }
2.ChannelInboundInvoker 接口的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public interface ChannelInboundInvoker { // 触发对下一个 ChannellnboundHandler 上的 channelRegistered() 方法的调用 ChannelInboundInvoker fireChannelRegistered(); // 触发对下一个 ChannellnboundHandler 上的 channelUnregistered() 方法的调用 ChannelInboundInvoker fireChannelUnregistered(); // 触发对下一个 ChannellnboundHandler 上的 channelActive() 方法(已连接)的调用 ChannelInboundInvoker fireChannelActive(); // 触发对下一个 ChannellnboundHandler 上的 channellnactive() 方法(已关闭)的调用 ChannelInboundInvoker fireChannelInactive(); // 触发对下一个 ChannellnboundHandler 上的 exceptionCaught (Throwable)方法的调用 ChannelInboundInvoker fireExceptionCaught(Throwable cause); // 触发对下一个 ChannellnboundHandler 上的 userEventTriggered (Object evt)方法的调用 ChannelInboundInvoker fireUserEventTriggered(Object event); // 触发对下一个 ChannellnboundHandler 上的 channelRead() 方法(已接收的消息)的调用 ChannelInboundInvoker fireChannelRead(Object msg); // 触发对下一个 ChannellnboundHandler 上的 channelReadComplete() 方法的调用 ChannelInboundInvoker fireChannelReadComplete(); // 触发对下一个 ChannellnboundHandler 上的 channelWritabilityChanged() 方法的调用 ChannelInboundInvoker fireChannelWritabilityChanged(); }
3.ChannelOutboundInvoker 接口的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public interface ChannelOutboundInvoker { // 绑定到给定的 SocketAddress,并返回 ChannelFuture ChannelFuture bind(SocketAddress localAddress); // 连接给定的 SocketAddress,并返回 ChannelFuture ChannelFuture connect(SocketAddress remoteAddress); // 从之前分配的 EventExecutor 注销,并返回 ChannelFuture ChannelFuture deregister(); // 从远程节点断开,并返回 ChannelFuture ChannelFuture disconnect(ChannelPromise promise); // 将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个 channelRead 事件 // 并在最后一个消息被读取完成后,通知 ChannelInboundHandler 的 channelReadComplete 方法 ChannelOutboundInvoker read(); // 将写到一个临时队列中 ChannelFuture write(Object msg); // 将临时队列中的消息写到 Socket 缓冲区 ChannelOutboundInvoker flush(); // write + flush = 直接写到 Socket 缓冲区 ChannelFuture writeAndFlush(Object msg); //...还有一些不常用的方法就不列出来了 }
PS:关于 write() 和 flush() 的源码分析可以参考这篇文章…
4.ChannelHandlerContext 自己的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { // 获取绑定这个实例的 Channel Channel channel(); // 获取调度事件的 EventExecutor EventExecutor executor(); // 获取这个实例的唯一名称 String name(); // 获取绑定这个实例的 ChannelHandler ChannelHandler handler(); // 如果所关联的 ChannelHandler 已被从 ChannelPipeline 中移除,则返回 true boolean isRemoved(); // 获取这个实例所关联的 ChannelPipeline ChannelPipeline pipeline(); // 获取和这这个实例相关联的 Channel 所配置的 ByteBufAllocator ByteBufAllocator alloc(); }
注:在我们平时编写 ChannelHandler 要将数据写到 Socket 中时,有两种方案:
ctx.channel().writeAndFlush
,将从 Pipeline 的尾部开始往前找 OutboundHandlerctx.writeAndFlush
会从当前 handler 往前找 OutboundHandler
2.ChannelHandlerContext 子类
来看 ChannelHandlerContext 继承类图:
1.AbstractChannelHandlerContext
是 ChannelHandlerContext 的一个抽象实现
-
定义了链表的关键 – next、prev 指针
-
定义了很多 ChannelHandlerContext 节点状态
-
实现了上面列出的所有方法,包括父接口 AttributeMap、ChannelInboundInvoker 、ChannelOutboundInvoker 及 ChannelHandlerContext 自己的方法(见下图)
注:由于 ChannelContextHandler 将 AttributeMap 和 ChannelInboundInvoker 接口的方法 @Override 了,所以我们在上图看到 attr() 和 fireXX() 都显示的是实现自 ChannelContextHandler 。
2.DefaultChannelHandlerContext
是 ChannelHandlerContext 的默认实现类,不过主要功能都在AbstractChannelHandlerContext 中已经实现好了,DefaultChannelHandlerContext 非常简单
3.HeadContext
是 ChannelPipeline 中的头节点,是一个比较特殊的节点,
- 它是 ChannelHandlerContext 的实现类,因此是一个 ChannelPipeline 内部链表节点
- 它实现了入站出站接口 ChannelOutboundHandler 和 ChannelInboundHandler,因此是一个双向处理器。
HeadContext 里面有很多方法。通过内部持有的 unsafe 对象来做具体的读、写、连接、绑定端口等IO事件,功能上看 HeadContext 会将事件传播到下一个入站处理器。
4.TailContext
是 ChannelPipeline 中的尾节点,也是一个比较特殊的节点
- 它是 ChannelHandlerContext 的实现类,因此是一个 ChannelPipeline 内部链表节点
- 它实现了 ChannelInboundHandler,因此是一个入站事件处理器,可处理入站事件
不过 TailContext 继承自ChannelInboundHandler 的很多入站方法都是空方法。TailContext 大部分情况下是什么都不做,有几个方法会将未处理的异常打印 Warn 日志。
3.事件传播方法 – fireXX() 源码分析
事件传播方法继承自 ChannelInboundInvoker 接口,在 AbstractChannelHandlerContext 抽象类中被实现。我们下面来分析一下 fireChannelRead() 方法的源码:
1
2
3
4
5
6@Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50/** * 1.向后查找,如果 ChannelHandlerContext 为 inbound 的类型 就返回 */ private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while ((ctx.executionMask & mask) == 0); return ctx; } /** * 2.触发 ChannelPipeline 中后面一个 ChannelInboundHandler 的 channelRead 方法被调用 */ static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 获取 ChannelHandlerContext 的 EventExecutor EventExecutor executor = next.executor(); // 如果EventExecutor线程在EventLoop线程中,就直接调用 if (executor.inEventLoop()) { // 核心!! next.invokeChannelRead(m); // 反之则递交给executor执行 } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } /** * 3.调用下一个入站处理器的 channelRead 方法,将消息传递过去 */ private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { // 获取handler对象(ChannelInboundHandler)并且执行该对象的 channelRead 方法...... ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { // 通知 Inbound 事件的传播,发生异常 notifyHandlerException(t); } } else { fireChannelRead(msg); } }
实际上其他的 fireXX() 方法也是类似的,基本思想就是找到下一个 ChannelHandlerContext 节点然后调用其内部的ChannelHandler 对象对应的方法,其他类似方法就不一一分析了。
最后
以上就是神勇花卷最近收集整理的关于【Netty】原理分析:ChannelHandlerContext的全部内容,更多相关【Netty】原理分析内容请搜索靠谱客的其他文章。
发表评论 取消回复