关于长连接的一些介绍
长连接的应用场景非常的广泛,比如监控系统,IM系统,即时报价系统,推送服务等等。像这些场景都是比较注重实时性,如果每次发送数据都要进行一次DNS解析,建立连接的过程肯定是极其影响体验。
长连接的维护必然需要一套机制来控制。比如 HTTP/1.0 通过在 header 头中添加 Connection:Keep-Alive参数,如果当前请求需要保活则添加该参数作为标识,否则服务端就不会保持该连接的状态,发送完数据之后就关闭连接。HTTP/1.1以后 Keep-Alive 是默认打开的。
Netty 是 基于 TCP 协议开发的,在四层协议 TCP 协议的实现中也提供了 keepalive 报文用来探测对端是否可用。TCP 层将在定时时间到后发送相应的 KeepAlive 探针以确定连接可用性
Netty状态回调代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46class ClientHandler : ChannelInboundHandlerAdapter() { /** 客户端请求的心跳命令 */ private val heartBeat = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("heartbeat", CharsetUtil.UTF_8)) override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { //接收到消息 } override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { //添加心跳 if (evt is IdleStateEvent) { if (IdleState.WRITER_IDLE?.equals(evt.state())) { ctx.writeAndFlush(heartBeat.duplicate()) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) } } super.userEventTriggered(ctx, evt) } override fun channelActive(ctx: ChannelHandlerContext) { //连接成功回调 ctx.fireChannelActive() } override fun channelInactive(ctx: ChannelHandlerContext) { super.channelInactive(ctx) //中途断开回调 } override fun exceptionCaught( ctx: ChannelHandlerContext, cause: Throwable ) { //连接抛出异常回调 cause.printStackTrace() ctx.close() } override fun handlerRemoved(ctx: ChannelHandlerContext) { super.handlerRemoved(ctx) //与服务断开连接时的回调 NettyObserverManager.instance.notifyObserver(ctx)//观察者发送断开通知 代码在下面 } }
添加回调的管理类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15class SimpleChatClientInitializer : ChannelInitializer<SocketChannel>() { @Throws(Exception::class) override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline() //心跳每次像服务端发送的频率 pipeline.addLast(IdleStateHandler(4, 4, 4, TimeUnit.SECONDS)) pipeline.addLast("decoder", StringDecoder()) pipeline.addLast("encoder", StringEncoder()) //添加自定义的监听回调对象 pipeline.addLast("handler", ClientHandler()) } }
使用服务开启长连接
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150class NotifyService : Service(), ChannelFutureListener, NettyObserverListener { private var channel: Channel? = null private var host = "xxx.xxx.xxx"//与后台统一的IP地址 private val port = 0//与后台统一的端口号 private var nio: NioEventLoopGroup? = null private val clientInitializer = SimpleChatClientInitializer() private val grayServiceId = 1001 //当前服务对象 private val mBinder = ClientBinder() override fun onBind(intent: Intent): IBinder { return mBinder } override fun onCreate() { super.onCreate() //添加观察者 NettyObserverManager.instance.add(this) init() } override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { //设置前台服务提高优先级 if (Build.VERSION.SDK_INT < Build.VERSION_CODES.O) { //Android4.3 - Android8.0,隐藏Notification上的图标 val innerIntent = Intent(this, GrayInnerService::class.java) startService(innerIntent) startForeground(grayServiceId, Notification()) } else { //Android8.0以上app启动后通知栏会出现一条"正在运行"的通知 val channel = NotificationChannel( "com.guanwei.pddemo", "notify", NotificationManager.IMPORTANCE_HIGH ) val manager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager manager.createNotificationChannel(channel) val notification = Notification.Builder( applicationContext, "com.guanwei.pddemo" ).build() startForeground(grayServiceId, notification) } return START_STICKY } /** * 服务在后台灰色保活 */ inner class GrayInnerService : Service() { override fun onBind(intent: Intent?): IBinder? { return null } override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { startForeground(grayServiceId, Notification()) stopForeground(true) stopSelf() return super.onStartCommand(intent, flags, startId) } } private fun init() { Thread { kotlin.run { if (nio == null) { nio = NioEventLoopGroup() } //开始连接 doConnect() } }.start() } private fun doConnect() { try { channel = Bootstrap().run { //连接建立 group(nio) channel(NioSocketChannel::class.java) handler(clientInitializer) connect( host, port ).sync().channel() } val map = HashMap<String, Long>() map["id"] = SPUtils.getInstance().getLong("userId") val json = JSONObject(map as Map<*, *>).toString() sendMessage(json) } catch (e: Exception) { e.stackTrace } } fun sendMessage(msg: String) { //向服务端发送消息 channel!!.writeAndFlush(msg).addListener(this) } /** * 连接失败回调 */ override fun operationComplete(future: ChannelFuture) { if (!future.isSuccess) { //连接失败重新连接 //每三秒进行一次重连接 future.channel().eventLoop().schedule({ kotlin.run { doConnect() } }, 3, TimeUnit.SECONDS) } else { LogUtils.a("连接成功") } } override fun nettyObserverUpData(v: ChannelHandlerContext) { //每三秒进行一次重连接 v.channel().eventLoop().schedule({ kotlin.run { doConnect() } }, 3, TimeUnit.SECONDS) } override fun onDestroy() { super.onDestroy() //移除观察者 NettyObserverManager.instance.remove(this) } /** * 与activity进行交互 */ inner class ClientBinder : Binder() { public fun getService(): NotifyService { return this@NotifyService } public fun sendMsg(msg: String) { sendMessage(msg) } } }
封装观察者进行状态监听与通知
复制代码
1
2
3
4
5
6
7
8
9
10/** * netty状态观察者接口 */ interface NettyObserverListener { /** * 刷新操作 */ fun nettyObserverUpData(v: ChannelHandlerContext) }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/** * netty观察者操作接口 */ interface NettySubjectListener { /** * 添加 */ fun add(nettyObserverListener: NettyObserverListener) /** * 通知内容 */ fun notifyObserver(t: ChannelHandlerContext) /** * 删除 */ fun remove(nettyObserverListener: NettyObserverListener) }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39class NettyObserverManager : NettySubjectListener { /** * 数据集合 */ private val list = ArrayList<NettyObserverListener>() /** * 添加 */ override fun add(nettyObserverListener: NettyObserverListener) { list.add(nettyObserverListener) } /** * 更新 */ override fun notifyObserver(t: ChannelHandlerContext) { //数据更新 for (ol in list) { ol.nettyObserverUpData(t) } } /** * 移除 */ override fun remove(nettyObserverListener: NettyObserverListener) { list.remove(nettyObserverListener) } companion object { val instance: NettyObserverManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { NettyObserverManager() } } }
在Activity中开启、绑定服务
服务开启与绑定所需要的参数如何获取就不多讲了,不会的建议回去重新学一下子服务
复制代码
1
2
3
4
5
6
7
8
9//先开启服务 startService(iIntent) //绑定服务 bindService( iIntent, myConnection, Context.BIND_AUTO_CREATE )
复制代码
1
2
3
4
5
6
7
8override fun onDestroy() { super.onDestroy() //服务解绑 unbindService(myConnection) //关闭服务 stopService(iIntent) }
最后
以上就是清脆黑猫最近收集整理的关于Netty——实现Android客户端长连接的全部内容,更多相关Netty——实现Android客户端长连接内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复