我是靠谱客的博主 神勇花卷,这篇文章主要介绍【Netty】原理分析:ChannelHandlerContext,现在分享给大家,希望可以做个参考。

上面一篇文章介绍了 ChannelPipeline,它维护了一个有序的 ChannelHandler 列表。当 ChannelHandler 加入到 ChannelPipeline 的时候,会创建一个对应的 ChannelHandlerContext 并绑定,ChannelPipeline 实际维护的是和 ChannelHandlerContext 的关系,例如在 DefaultChannelPipeline:

复制代码
1
2
3
4
5
6
7
// public class DefaultChannelPipeline implements ChannelPipeline { //... final AbstractChannelHandlerContext head; // 头结点 final AbstractChannelHandlerContext tail; // 尾结点 }

DefaultChannelPipeline 会保存第一个ChannelHandlerContext以及最后一个ChannelHandlerContext的引用。而 AbstractChannelHandlerContext 中维护了 next 和 prev 指针:

复制代码
1
2
3
4
5
6
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { //... volatile AbstractChannelHandlerContext next; // 前驱节点 volatile AbstractChannelHandlerContext prev; // 后置节点 }

这样 ChannelHandlerContext 之间形成了双向链表。

1.ChannelHandlerContext 方法

我们来看一下 ChannelHandlerContext 的继承关系:

在这里插入图片描述
它继承了 AttributeMap 用于存储信息,实现了 ChannelInboundInvoker 和 ChannelOutboundInvoker 可进行事件的传播。

PS:ChannelPipeline 和 ChannelHandlerContext 都同时继承了 ChannelInboundInvoker 和 ChannelOutboundInvoker接口

1.AttributeMap 接口的方法

复制代码
1
2
3
4
5
6
7
public interface AttributeMap { <T> Attribute<T> attr(AttributeKey<T> key); <T> boolean hasAttr(AttributeKey<T> key); }

2.ChannelInboundInvoker 接口的方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface ChannelInboundInvoker { // 触发对下一个 ChannellnboundHandler 上的 channelRegistered() 方法的调用 ChannelInboundInvoker fireChannelRegistered(); // 触发对下一个 ChannellnboundHandler 上的 channelUnregistered() 方法的调用 ChannelInboundInvoker fireChannelUnregistered(); // 触发对下一个 ChannellnboundHandler 上的 channelActive() 方法(已连接)的调用 ChannelInboundInvoker fireChannelActive(); // 触发对下一个 ChannellnboundHandler 上的 channellnactive() 方法(已关闭)的调用 ChannelInboundInvoker fireChannelInactive(); // 触发对下一个 ChannellnboundHandler 上的 exceptionCaught (Throwable)方法的调用 ChannelInboundInvoker fireExceptionCaught(Throwable cause); // 触发对下一个 ChannellnboundHandler 上的 userEventTriggered (Object evt)方法的调用 ChannelInboundInvoker fireUserEventTriggered(Object event); // 触发对下一个 ChannellnboundHandler 上的 channelRead() 方法(已接收的消息)的调用 ChannelInboundInvoker fireChannelRead(Object msg); // 触发对下一个 ChannellnboundHandler 上的 channelReadComplete() 方法的调用 ChannelInboundInvoker fireChannelReadComplete(); // 触发对下一个 ChannellnboundHandler 上的 channelWritabilityChanged() 方法的调用 ChannelInboundInvoker fireChannelWritabilityChanged(); }

3.ChannelOutboundInvoker 接口的方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface ChannelOutboundInvoker { // 绑定到给定的 SocketAddress,并返回 ChannelFuture ChannelFuture bind(SocketAddress localAddress); // 连接给定的 SocketAddress,并返回 ChannelFuture ChannelFuture connect(SocketAddress remoteAddress); // 从之前分配的 EventExecutor 注销,并返回 ChannelFuture ChannelFuture deregister(); // 从远程节点断开,并返回 ChannelFuture ChannelFuture disconnect(ChannelPromise promise); // 将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个 channelRead 事件 // 并在最后一个消息被读取完成后,通知 ChannelInboundHandler 的 channelReadComplete 方法 ChannelOutboundInvoker read(); // 将写到一个临时队列中 ChannelFuture write(Object msg); // 将临时队列中的消息写到 Socket 缓冲区 ChannelOutboundInvoker flush(); // write + flush = 直接写到 Socket 缓冲区 ChannelFuture writeAndFlush(Object msg); //...还有一些不常用的方法就不列出来了 }

PS:关于 write() 和 flush() 的源码分析可以参考这篇文章…

4.ChannelHandlerContext 自己的方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { // 获取绑定这个实例的 Channel Channel channel(); // 获取调度事件的 EventExecutor EventExecutor executor(); // 获取这个实例的唯一名称 String name(); // 获取绑定这个实例的 ChannelHandler ChannelHandler handler(); // 如果所关联的 ChannelHandler 已被从 ChannelPipeline 中移除,则返回 true boolean isRemoved(); // 获取这个实例所关联的 ChannelPipeline ChannelPipeline pipeline(); // 获取和这这个实例相关联的 Channel 所配置的 ByteBufAllocator ByteBufAllocator alloc(); }

注:在我们平时编写 ChannelHandler 要将数据写到 Socket 中时,有两种方案:

  • ctx.channel().writeAndFlush,将从 Pipeline 的尾部开始往前找 OutboundHandler
  • ctx.writeAndFlush 会从当前 handler 往前找 OutboundHandler

2.ChannelHandlerContext 子类

来看 ChannelHandlerContext 继承类图:


1.AbstractChannelHandlerContext

是 ChannelHandlerContext 的一个抽象实现

  • 定义了链表的关键 – next、prev 指针

  • 定义了很多 ChannelHandlerContext 节点状态

  • 实现了上面列出的所有方法,包括父接口 AttributeMap、ChannelInboundInvoker 、ChannelOutboundInvoker 及 ChannelHandlerContext 自己的方法(见下图)


注:由于 ChannelContextHandler 将 AttributeMap 和 ChannelInboundInvoker 接口的方法 @Override 了,所以我们在上图看到 attr() 和 fireXX() 都显示的是实现自 ChannelContextHandler 。

2.DefaultChannelHandlerContext

是 ChannelHandlerContext 的默认实现类,不过主要功能都在AbstractChannelHandlerContext 中已经实现好了,DefaultChannelHandlerContext 非常简单

3.HeadContext

是 ChannelPipeline 中的头节点,是一个比较特殊的节点,

  • 它是 ChannelHandlerContext 的实现类,因此是一个 ChannelPipeline 内部链表节点
  • 它实现了入站出站接口 ChannelOutboundHandler 和 ChannelInboundHandler,因此是一个双向处理器。

HeadContext 里面有很多方法。通过内部持有的 unsafe 对象来做具体的读、写、连接、绑定端口等IO事件,功能上看 HeadContext 会将事件传播到下一个入站处理器。

4.TailContext

是 ChannelPipeline 中的尾节点,也是一个比较特殊的节点

  • 它是 ChannelHandlerContext 的实现类,因此是一个 ChannelPipeline 内部链表节点
  • 它实现了 ChannelInboundHandler,因此是一个入站事件处理器,可处理入站事件

不过 TailContext 继承自ChannelInboundHandler 的很多入站方法都是空方法。TailContext 大部分情况下是什么都不做,有几个方法会将未处理的异常打印 Warn 日志。

3.事件传播方法 – fireXX() 源码分析

事件传播方法继承自 ChannelInboundInvoker 接口,在 AbstractChannelHandlerContext 抽象类中被实现。我们下面来分析一下 fireChannelRead() 方法的源码:

复制代码
1
2
3
4
5
6
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/** * 1.向后查找,如果 ChannelHandlerContext 为 inbound 的类型 就返回 */ private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while ((ctx.executionMask & mask) == 0); return ctx; } /** * 2.触发 ChannelPipeline 中后面一个 ChannelInboundHandler 的 channelRead 方法被调用 */ static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 获取 ChannelHandlerContext 的 EventExecutor EventExecutor executor = next.executor(); // 如果EventExecutor线程在EventLoop线程中,就直接调用 if (executor.inEventLoop()) { // 核心!! next.invokeChannelRead(m); // 反之则递交给executor执行 } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } /** * 3.调用下一个入站处理器的 channelRead 方法,将消息传递过去 */ private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { // 获取handler对象(ChannelInboundHandler)并且执行该对象的 channelRead 方法...... ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { // 通知 Inbound 事件的传播,发生异常 notifyHandlerException(t); } } else { fireChannelRead(msg); } }

实际上其他的 fireXX() 方法也是类似的,基本思想就是找到下一个 ChannelHandlerContext 节点然后调用其内部的ChannelHandler 对象对应的方法,其他类似方法就不一一分析了。

最后

以上就是神勇花卷最近收集整理的关于【Netty】原理分析:ChannelHandlerContext的全部内容,更多相关【Netty】原理分析内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(115)

评论列表共有 0 条评论

立即
投稿
返回
顶部