MQTT简单使用总结
- 前言
- 一、MQTT工具
- 二、使用步骤
- 1.引入库
- 2.注入配置信息
- 3.MQTT消费者
- 4.MQTT回调函数
- 总结
前言
本文介绍MQTT消息队列的简单使用
一、MQTT工具
MQTT的工具有很多,这里选择MQTTX测试: MQTTX客户端
二、使用步骤
1.引入库
复制代码
1
2
3
4
5<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2.注入配置信息
复制代码
1
2
3
4
5
6
7
8mqtt: host: tcp://127.0.0.1 clientid: admin username: admin password: admin timeout: 10000 keepalive: 60
这里选择自己的配置信息
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class MqttCofigBean { @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.host}") private String hostUrl; @Value("${mqtt.clientid}") private String clientId; @Value("${mqtt.timeout}") private int completionTimeout; }
3.MQTT消费者
主要功能是初始化连接信息,推送MQTT消息,订阅MQTT主题.
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145@Slf4j @Component public class MqttConsumer implements ApplicationRunner { private static MqttClient client; private static MqttTopic mqttTopic; private static volatile MqttConsumer mqttConsumer = null; public static MqttConsumer getInstance() { if (null == mqttConsumer) { synchronized (MqttConsumer.class) { if (null == mqttConsumer) { mqttConsumer = new MqttConsumer(); } } } return mqttConsumer; } /** * MQTT连接属性配置对象 */ @Autowired public MqttCofigBean mqttCofigBean; /** * 初始化参数配置 */ @Override public void run(ApplicationArguments args) throws Exception { log.info("初始化启动MQTT连接"); this.connect(); } /** * 用来连接服务器 * @throws Exception 异常 */ private void connect() throws Exception { client = new MqttClient(mqttCofigBean.getHostUrl(), mqttCofigBean.getClientId() + "_" + StrUtil.uuid(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttCofigBean.getUsername()); options.setPassword(mqttCofigBean.getPassword().toCharArray()); //是否清除session options.setCleanSession(true); // 设置超时时间 options.setConnectionTimeout(30); // 设置会话心跳时间 options.setKeepAliveInterval(60); try { String[] msgtopic = MqttCofigBean.SUB_TOPICS; int[] qos = new int[msgtopic.length]; client.setCallback(new TopMsgCallback(client, options, msgtopic, qos)); client.connect(options); log.info("MQTT连接成功:" + mqttCofigBean.getClientId()); if (msgtopic.length != 0 && !"".equals(msgtopic[0])) { //订阅消息 for (int i = 0; i < msgtopic.length; i++) { qos[i] = 0; } client.subscribe(msgtopic, qos); } } catch (Exception e) { log.error("MQTT连接异常:" + e); } } /** * 重新连接 * @throws Exception 异常 */ public void reConnect() throws Exception { if (null != client) { this.connect(); } } /** * 发布,默认qos为0,非持久化 * * @param topic 主题 * @param pushMessage 推送消息 */ public void publish(String topic, String pushMessage) throws Exception { publish(0, false, topic, pushMessage); } /** * 发布 * * @param qos qos * @param retained 保留 * @param topic 主题 * @param pushMessage 推送消息 */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { log.error("topic not exist"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅某个主题 * * @param topic 主题 * @param qos qos */ public void subscribe(String topic, int qos) { try { log.info("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } public MqttClient getClient() { return client; } public MqttTopic getMqttTopic() { return mqttTopic; } }
4.MQTT回调函数
回调函数主要是负责,连接中断后重连,收到消息的处理,也可以使用监听器的方式,来处理收到消息的业务在处理.
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83@Slf4j public class TopMsgCallback implements MqttCallback { /** * MqttClient */ private MqttClient client; /** * MqttConnectOptions */ private MqttConnectOptions options; /** * topic数组 */ private String[] topic; /** * qos */ private int[] qos; public TopMsgCallback() { } public TopMsgCallback(MqttClient client, MqttConnectOptions options) { this.client = client; this.options = options; } public TopMsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) { this.client = client; this.options = options; this.topic = topic; this.qos = qos; } /** * 断开重连 */ @Override public void connectionLost(Throwable cause) { log.info("MQTT连接断开,发起重连"); while (true) { try { Thread.sleep(30000); client.connect(options); log.info("MQTT重新连接成功:" + client); String[] msgtopic = MqttCofigBean.SUB_TOPICS; int[] qos = new int[msgtopic.length]; if (msgtopic.length != 0 && !"".equals(msgtopic[0])) { //订阅消息 for (int i = 0; i < msgtopic.length; i++) { qos[i] = 0; } client.subscribe(msgtopic, qos); } break; } catch (Exception e) { e.printStackTrace(); } } } /** * 接收到消息调用令牌中调用 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { } /** * 消息处理 * 封装数据,储存存活设备存入Redis,推送数据到kafka */ @Override public void messageArrived(String topic, MqttMessage message) { JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(message)); // TODO 升级结果推送至业务处理 log.info("接收到的消息是{}", message); } }
总结
没有总结
最后
以上就是负责鼠标最近收集整理的关于MQTT简单使用总结前言一、MQTT工具二、使用步骤总结的全部内容,更多相关MQTT简单使用总结前言一、MQTT工具二、使用步骤总结内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复