我是靠谱客的博主 耍酷手套,这篇文章主要介绍MQTT协议测试工具及核心代码,现在分享给大家,希望可以做个参考。

基于MQTTnet做了个MQTT协议测试程序,本程序包括服务的和两个客户端,方便在不引入外部支撑的情况下测试MQTT协议。
测试软件界面如下
在这里插入图片描述

1:启动MQTT服务

在Server区域,
启动服务可以选择MQTT服务使用的端口、用户名、密码。
如有有客户端需要连接服务,需要把这些信息告诉他们。
右边tab页有启动工程的详细日志。
在这里插入图片描述

2:MQTT客户端

在该测试程序中实现了两个客户端,方便测试使用,两个客户端功能万千一致,以客户端1为例介绍配置过程中的相关参数。
在这里插入图片描述
首先是需要连接的MQTT服务器IP和端口,这些信息一般有服务端提供,本程序直接连接本机的服务端,因此IP配置为127.0.0.1,端口和服务器端口一致12345。
其次,根据MQTT协议,连接服务器时需要提供用户名、密码用于鉴权认证。
最后,需要一个终端唯一标识,即客户标识,该标识在一个服务器下需要保证唯一。
到此和MQTT服务器连接相关参数就完备了,可以连接服务器了。
Qos是遗嘱消息服务质量等级,取值即含义如下
在这里插入图片描述
RETAIN: 保留标志位,如果为1,服务器存储最新一条RETAIN消息,以便分发给新的订阅者

MQTT是发布订阅模式的,客户端如果要收到消息需要先订阅对应的主题,订阅主题数量是没有限制的。
例如:client1订阅主题client1/sub、client2/sub
client2订阅主题client2/sub
然后在已订主题的下拉框中就可以看到本客户端订阅的主题。
如何要取消对应的主题,在下拉列表中先择,然后点击取消,就会取消订阅。
发布主题需要设定主题和发布内容。
例如用client1发布:
主题:client2/sub
内容:新信息。
例如用client2发布:
主题:client1/sub
内容:test
根据刚才设置的订阅信息,两个客户端收到的数据如下:
在这里插入图片描述
我们可以看到主题:client2/sub在两个客户端都收到了。
自此,测试功能基本就介绍完了。

3:程序功能实现

该程序是基于MQTTnet库最的。
因为要实现服务端和客户端。因此创建连个类:MQTTServer、MQTTClients,这部分代码也是在网上找的,但功能实现比较完备,所有我就直接使用了。
服务端MQTTServer

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public class MQTTServer { public StringBuilder sb = new StringBuilder(""); public MqttServer mqttServer = null; public bool MqttServer_runflag = false; public async void StartMqttServer(int port = 12345, string user = "123", string passwd = "123") { if (mqttServer == null) { var optionsBuilder = new MqttServerOptionsBuilder() .WithDefaultEndpoint().WithDefaultEndpointPort(port).WithConnectionValidator( c => { var currentUser = user; var currentPWD = passwd; if (currentUser == null || currentPWD == null) { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } if (c.Username != currentUser) { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } if (c.Password != currentPWD) { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } c.ReasonCode = MqttConnectReasonCode.Success; }).WithSubscriptionInterceptor( c => { c.AcceptSubscription = true; }).WithApplicationMessageInterceptor( c => { c.AcceptPublish = true; }); mqttServer = new MqttFactory().CreateMqttServer() as MqttServer; mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted); mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped); mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected); mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected); mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic); mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic); mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived); await mqttServer.StartAsync(optionsBuilder.Build()); } } public async void StopMqttServer() { await mqttServer?.StopAsync(); } public async void PublishMqttTopic(string topic, string payload) { var message = new MqttApplicationMessage() { Topic = topic, Payload = Encoding.UTF8.GetBytes(payload) }; await mqttServer.PublishAsync(message); } public void OnMqttServerStarted(EventArgs e) { sb.Append("MQTT服务启动完成!rn"); } public void OnMqttServerStopped(EventArgs e) { sb.Append("MQTT服务停止完成!rn"); } public void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e) { sb.Append($"客户端[{e.ClientId}]已连接rn"); } public void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e) { sb.Append($"客户端[{e.ClientId}]已断开连接!rn"); //PublishMqttTopic("client/Disconnected", $"客户端[{ e.ClientId}]已断开连接"); } public void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e) { //sb.Append($"客户端[{e.ClientId}]已成功订阅主题[{e.TopicFilter}]!rn"); sb.Append($"客户端[{e.ClientId}]已成功订阅主题[{e.TopicFilter.Topic}]!rn"); } public void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e) { sb.Append($"客户端[{e.ClientId}]已成功取消订阅主题[{e.TopicFilter}]!rn"); } public void OnMqttServer_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { sb.Append($"客户端[{e.ClientId}]>> 主题:{e.ApplicationMessage.Topic} rn"); //Console.WriteLine($"客户端[{e.ClientId}]>> 主题:{e.ApplicationMessage.Topic} 负荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}"); } }

客户端MQTTClients

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
public class MQTTClients { private MqttClient mqttClient = null; public StringBuilder sb = new StringBuilder(""); public async Task<MqttClientConnectResultCode> ClientStart(string tcpServer, int tcpPort, string mqttUser, string mqttPassword, String ClientId = "01001") { try { var mqttFactory = new MqttFactory(); //https://www.cnblogs.com/zhaoqm999/p/12960677.html var options = new MqttClientOptions { ClientId = ClientId, ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311, ChannelOptions = new MqttClientTcpOptions { Server = tcpServer, Port = tcpPort }, WillDelayInterval = 10, WillMessage = new MqttApplicationMessage() { Topic = $"LastWill/121313", Payload = Encoding.UTF8.GetBytes("I Lost the connection!" + ClientId), QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce } }; if (options.ChannelOptions == null) { throw new InvalidOperationException(); } if (!string.IsNullOrEmpty(mqttUser)) { options.Credentials = new MqttClientCredentials { Username = mqttUser, Password = Encoding.UTF8.GetBytes(mqttPassword) }; } options.CleanSession = true; options.KeepAlivePeriod = TimeSpan.FromSeconds(5); mqttClient = mqttFactory.CreateMqttClient() as MqttClient; mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected); mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived); MqttClientAuthenticateResult result = await mqttClient.ConnectAsync(options); return result.ResultCode; } catch (Exception ex) { return MqttClientConnectResultCode.ServerBusy; //lbxMonitor.BeginInvoke(_updateMonitorAction, // Logger.TraceLog(Logger.Level.Fatal, $"客户端尝试连接出错.>{ex.Message}")); } } public async Task<int> CLientConnect(string tcpServer, int tcpPort, string mqttUser, string mqttPassword, String ClientId = "01001") { MqttClientConnectResultCode resultCode = await ClientStart(tcpServer, tcpPort, mqttUser, mqttPassword, ClientId); if(resultCode == MqttClientConnectResultCode.Success) { return 0; } else { return 1; } } public async Task ClientStop() { try { if (mqttClient == null) return; await mqttClient.DisconnectAsync(); mqttClient = null; } catch (Exception ex) { } } public void OnMqttClientConnected(MqttClientConnectedEventArgs e) { //Console.WriteLine($"客户端[{e.ClientId}]已断开连接!"); //btnConnect.Text = "Connected"; ; //btnConnect.BackColor = Color.LightGreen; //btnConnect.Tag = 1; } public void OnMqttClientDisConnected(MqttClientDisconnectedEventArgs e) { //Console.WriteLine($"客户端[{e.ClientId}]已断开连接!"); } public async Task ClientPublishMqttTopic(string topic, string payload, int qos = 1, bool retain = false) { try { var message = new MqttApplicationMessage() { Topic = topic, Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("HH:mm:ss:ffff") + ":" + payload), QualityOfServiceLevel = (MqttQualityOfServiceLevel)qos,// cmbQos.SelectedIndex, Retain = retain//bool.Parse(cmbRetain.SelectedItem.ToString()) }; await mqttClient.PublishAsync(message); //lbxMonitor.BeginInvoke(_updateMonitorAction, // Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]发布主题[{1}]成功!", mqttClient.Options.ClientId, topic))); } catch (Exception ex) { //lbxMonitor.BeginInvoke(_updateMonitorAction, // Logger.TraceLog(Logger.Level.Fatal, string.Format("客户端[{0}]发布主题[{1}]异常!>{2}", mqttClient.Options.ClientId, topic, ex.Message))); } } public async Task ClientSubscribeTopic(string topic) { await mqttClient.SubscribeAsync(topic); //lbxMonitor.BeginInvoke(_updateMonitorAction, // Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]订阅主题[{1}]成功!", mqttClient.Options.ClientId, topic))); } public async Task ClientUnSubscribeTopic(string topic) { await mqttClient.UnsubscribeAsync(topic); //lbxMonitor.BeginInvoke(_updateMonitorAction, // Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]取消主题[{1}]成功!", mqttClient.Options.ClientId, topic))); } /// <summary> /// 当客户端接收到所订阅的主题消息时 /// </summary> /// <param name="e"></param> private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs e) { string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); string Topic = e.ApplicationMessage.Topic; string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString(); string Retained = e.ApplicationMessage.Retain.ToString(); Console.WriteLine(Topic +" " +text); sb.Append(DateTime.Now.ToString("HH:mm:ss:ffff") + ">>" + Topic + " " + text); //lbxMonitor.BeginInvoke(_updateMonitorAction, // Logger.TraceLog(Logger.Level.Info, "MessageReceived >>Topic:" + Topic + "; QoS: " + QoS + "; Retained: " + Retained)); //lbxMonitor.BeginInvoke(_updateMonitorAction, // Logger.TraceLog(Logger.Level.Info, "MessageReceived >>Msg: " + text)); } }

然后在Winform的界面程序中定义使用。

最后

以上就是耍酷手套最近收集整理的关于MQTT协议测试工具及核心代码的全部内容,更多相关MQTT协议测试工具及核心代码内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(98)

评论列表共有 0 条评论

立即
投稿
返回
顶部