概述
Channel接口针对Channel的读入和写出IO事件的处理,定义了两个拓展接口:ChannelInboundHandler用于定义对读入IO事件的处理,ChannelOutboundHandler用于定义写出IO事件的处理。
ChannelInboundHandler
ChannelInboundHandler接口定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59/** * {@link ChannelHandler} which adds callbacks for state changes. This allows the user * to hook in to state changes easily. */ public interface ChannelInboundHandler extends ChannelHandler { /** * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop} */ void channelRegistered(ChannelHandlerContext ctx) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop} */ void channelUnregistered(ChannelHandlerContext ctx) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} is now active */ void channelActive(ChannelHandlerContext ctx) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its * end of lifetime. */ void channelInactive(ChannelHandlerContext ctx) throws Exception; /** * Invoked when the current {@link Channel} has read a message from the peer. */ void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; /** * Invoked when the last message read by the current read operation has been consumed by * {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further * attempt to read an inbound data from the current {@link Channel} will be made until * {@link ChannelHandlerContext#read()} is called. */ void channelReadComplete(ChannelHandlerContext ctx) throws Exception; /** * Gets called if an user event was triggered. */ void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; /** * Gets called once the writable state of a {@link Channel} changed. You can check the state with * {@link Channel#isWritable()}. */ void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; /** * Gets called if a {@link Throwable} was thrown. */ @Override @SuppressWarnings("deprecation") void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
ChannelInboundHandlerAdapter
-
ChannelInboundHandler的默认实现类,方便使用,用户通过拓展该类,只重写与自身关注的IO事件的处理的方法。ChannelInboundHandlerAdapter的每个方法的默认实现都是通过ChannelHandlerContext将IO事件或接收到的数据,传给所在的ChannelPipeline的下一个ChannelInboundHandler:
-
业务处理逻辑处理:用户可以通过拓展ChannelInboundHandlerAdapter,重写相应的方法来,生成新的子类的方式来定义业务需要的处理逻辑,Netty默认针对特定功能的处理,提供了一些ChannelInboundHandler的实现类,详见下面分析。
-
对于从Channel读入的数据,在调用channelRead方法处理时,默认实现也是传给下一个ChannelInboundHandler处理,不会销毁该数据对象,释放掉该数据所占用的空间的:
如果数据只需在当前ChannelInboundHandler处理,而不需要继续往下传输,则可以调用ReferenceCountUtil.release(msg)手动释放掉,或者拓展SimpleChannelInboundHandler类。 -
SimpleChannelInboundHandler
SimpleChannelInboundHandler为ChannelInboundHandlerAdapter的一个拓展实现,重写了channelRead方法,并提供了一个抽象方法channelRead0(在netty 5.0之后channelRead0方法名称变成了messageReceived)供用户实现自身的数据处理逻辑:
针对Channel读入的数据的处理,提供了两个功能:- 特定类型数据处理:用户通过拓展实现SimpleChannelInboundHandler来定义业务处理的ChannelInboundHandler时,通过指定泛型参数I,限制该ChannelInboundHandler只处理Channel读入的类型为I的数据;
- 自动释放数据:如果Channel读入的数据的类型与泛型参数I匹配,则在该ChannelInboundHandler处理掉了,在finally中,根据调用构造函数创建ChannelInboundHandler时,是否需要自动释放数据(默认为true),来自动进行数据空间的释放。
- 数据的保留:如果用户在实现channelRead0方法自定义数据处理逻辑时,需要将该数据传给下一个ChannelInboundHandler,则需要调用ReferenceCountUtil.retain(msg)方法,原理是将msg的引用计数加1,因为ReferenceCountUtil.release(msg)是将msg的引用计数减1,同时当引用计数变成0时,释放该数据:
1
2
3
4
5
6
7
8
9
10
11
12public class StringHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String message) throws Exception { System.out.println(message); // 不释放数据,交给下一个ChannelInboundHandler继续处理 ReferenceCountUtil.retain(message); ctx.fireChannelRead(message); } }
ChannelOutboundHandler
ChannelOutboundHandler接口定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81/** * {@link ChannelHandler} which will get notified for IO-outbound-operations. */ public interface ChannelOutboundHandler extends ChannelHandler { /** * Called once a bind operation is made. * * @param ctx the {@link ChannelHandlerContext} for which the bind operation is made * @param localAddress the {@link SocketAddress} to which it should bound * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; /** * Called once a connect operation is made. * * @param ctx the {@link ChannelHandlerContext} for which the connect operation is made * @param remoteAddress the {@link SocketAddress} to which it should connect * @param localAddress the {@link SocketAddress} which is used as source on connect * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; /** * Called once a disconnect operation is made. * * @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * Called once a close operation is made. * * @param ctx the {@link ChannelHandlerContext} for which the close operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * Called once a deregister operation is made from the current registered {@link EventLoop}. * * @param ctx the {@link ChannelHandlerContext} for which the close operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * Intercepts {@link ChannelHandlerContext#read()}. */ void read(ChannelHandlerContext ctx) throws Exception; /** * Called once a write operation is made. The write operation will write the messages through the * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once * {@link Channel#flush()} is called * * @param ctx the {@link ChannelHandlerContext} for which the write operation is made * @param msg the message to write * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; /** * Called once a flush operation is made. The flush operation will try to flush out all previous written messages * that are pending. * * @param ctx the {@link ChannelHandlerContext} for which the flush operation is made * @throws Exception thrown if an error occurs */ void flush(ChannelHandlerContext ctx) throws Exception; }
ChannelOutboundHandlerAdapter
与ChannelInboundHandlerAdapter的作用类似,方法默认实现也是通过ChannelHandlerContext将写出的数据,交给下一个ChannelOutboundHandler处理。
ChannelDuplexHandler
同时具有ChannelInboundHandler和ChannelInboundHandler的功能,可以实现相应方法,对Channel的读入数据和写出数据都进行处理,默认实现也是交给下一个ChannelHandler处理。
Netty提供的ChannelHandler实现
Netty在handler子模块,针对不同的功能,包括流量控制,数据flush,ip过滤,日志,ssl,大数据流处理,超时心跳检测,拥塞控制,提供了相应的ChannelHandler实现类,如图:
- 示例:心跳检测与长连接维持
Netty源码分析-基于Netty的心跳检测机制实现长连接
最后
以上就是老实舞蹈最近收集整理的关于Netty源码分析-数据处理器ChannelInboundHandler和ChannelOutboundHandler的全部内容,更多相关Netty源码分析-数据处理器ChannelInboundHandler和ChannelOutboundHandler内容请搜索靠谱客的其他文章。
发表评论 取消回复