我是靠谱客的博主 追寻外套,这篇文章主要介绍MQTT协议初识——简单收发第一部分:认识MQTT第二部分准备工作第三部分开发,现在分享给大家,希望可以做个参考。

第一部分:认识MQTT

先来一段百度文库的介绍

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议。
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:

我的认识

再我看来MQTT就有一点类似于java设计模式中的观察者模式,进行注册订阅信息后,当有类似消息进入队列时就会监听到,大概我画了一个草图:手动绘图板,所以比较丑^_^
这里写图片描述
消息接收者(消费者)会注册一个(或多个)感兴趣的主题,然后当信息发布者(生产者)发布一个消息后,注册过该主题的消息接收者(消费者),就可以获取到这条消息,MQTT就是针对这种场景,对tcp进行了一层封装。

MQTT协议详解

https://mcxiaoke.gitbooks.io/mqtt-cn/content/

第二部分准备工作

  1. 首先要准备apollo作为消息队列
    安装流程请参考:http://blog.csdn.net/whb3299065/article/details/79092095
  2. 准备jar:http://download.csdn.net/download/whb3299065/10211343为了方便,我把所有用到的东西发布了一份。

    复制代码
    1
    2
    3
    4
    5
    6
    7
    jarMaven地址 <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>

第三部分开发

我初学的时候发现很多人喜欢将DOME写成客户端已服务器两个部分,对我理解造成了一些困扰,其实QMTT并没有单纯意义上的服务和客户,我们应该把它理解成平级对待,大家都可以向apollo发送消息,也都可以从apollo中订阅主题。(订阅/发布)

一、创建监听

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class PushCallback implements MqttCallback{ //连接丢失:一般用与重连 public void connectionLost(Throwable throwable) { System.out.println("丢失连接"); } //消息到达:指收到消息 public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { //(发布)publish后会执行到这里,发送状态 System.out.println("deliveryComplete---------" + token.isComplete()); } }

二、接收消息(如果你不好理解,可以暂且把它理解成客户端):

import

复制代码
1
2
3
4
5
6
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import top.whbweb.mqtt.callback.PushCallback; import java.util.UUID;
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) { try { //apollo地址 String HOST = "tcp://127.0.0.1:61613"; //要订阅的主题 String TOPIC1="abc"; //指你Apollo中的用户名密码 String userName="admin"; String pwd="password"; String clientid =UUID.randomUUID().toString().replace("-",""); MqttClient client=new MqttClient(HOST,clientid,new MemoryPersistence()); // MQTT的连接对象 MqttConnectOptions options = new MqttConnectOptions(); //设置连接参数 //清除session回话 options.setCleanSession(false); options.setUserName(userName); options.setPassword(pwd.toCharArray()); //超时设置 options.setConnectionTimeout(10); //心跳保持时间 options.setKeepAliveInterval(20); //遗嘱:当该客户端端口连接时,会向whb主题发布一条信息 options.setWill("whb","我挂了,你加油".getBytes(),1,true); //监听对象:自己创建 client.setCallback(new PushCallback()); //打开连接 client.connect(options); //设置消息级别 int[] Qos={1}; //订阅主题 String[] topics={TOPIC1}; client.subscribe(topics,Qos); } catch (MqttException e) { e.printStackTrace(); } }

发送端

先建立连接,然后准备信息(主题,内容,级别),最后进行发送

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import top.whbweb.mqtt.callback.PushCallback; import java.io.UnsupportedEncodingException; import java.util.Scanner; public class SendOut { //tcp://MQTT安装的服务器地址:MQTT定义的端口号 public static final String HOST = "tcp://127.0.0.1:61613"; //定义一个主题 public static final String TOPIC = "topic11"; //定义MQTT的ID,可以在MQTT服务配置中指定 private static final String clientid = "server11"; private MqttMessage message; public static final String TOPIC1 = "topic1"; public static final String userName = "admin"; public static final String pwd = "password"; public MqttClient client; private MqttTopic topic; public SendOut() { try { client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); } catch (MqttException e) { e.printStackTrace(); } } //发布消息 public void publish(MqttTopic topic, MqttMessage message) throws MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); //打印发送状态 System.out.println("message is published completely!" + token.isComplete()); } //建立连接:参数与订阅端相似 private void connect() throws MqttException { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(pwd.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); client.setCallback(new PushCallback()); client.connect(options); } public static void main(String[] args) throws MqttException, UnsupportedEncodingException { SendOut service = new SendOut(); Scanner sc = new Scanner(System.in); service.topic = service.client.getTopic(TOPIC); service.message = new MqttMessage(); //确保被收到一次 service.message.setQos(1); service.message.setPayload("如果世界漆黑,其实我很美".getBytes("UTF-8")); service.publish(service.topic, service.message); } }

这样,订阅者就可以接收到我们发送的消息了。

最后

以上就是追寻外套最近收集整理的关于MQTT协议初识——简单收发第一部分:认识MQTT第二部分准备工作第三部分开发的全部内容,更多相关MQTT协议初识——简单收发第一部分内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(90)

评论列表共有 0 条评论

立即
投稿
返回
顶部