我是靠谱客的博主 灵巧水蜜桃,这篇文章主要介绍通过LoRa模块SX1268点对点通信,树莓派接收并发送数据到阿里云物联网平台,现在分享给大家,希望可以做个参考。

通过LoRa模块SX1268点对点通信,树莓派接收并发送数据到阿里云物联网平台

前言关于如何使用树莓派将设备上云,我前一篇博客有详细介绍了。这里我主要记录SX1268模块点对点通信的使用和如何通过串口发送和接收数据并上传到阿里云物联网平台

SX1268模块在使用之前需要对它进行配置一些参数:
1、首先将SX1268模块的工作模式设置为配置模式:M0 = 0,M1 = 1 在这里插入图片描述
然后使用usb线连接到PC机,打开串口
2、在串口监视器输入以下命令:
c1 05 01 01
c1 00 04 03 04 00 61
在这里插入图片描述在这里插入图片描述
!我这里配置信道为01,模块地址为0x0304、网络地址为0x00、串口为9600 8N1、空速为1.2K,俩个SX1268模块配置的参数都要相同。配置好之后,这俩个模块就可以正常的通过串口收发数据了。

3、配置好之后就接线,PC机通过usb线接一个SX1268模块,树莓派端直接把SX1268模块插进去就行,进入exmaple示例代码中的thing_alinkk.py文件中,修改设备三元组和把串口通信程序加进去,完整代码在下面,运行成功之后就可以通过PC端串口监视器看到树莓派发过来的数据,PC端发送数据过去之后在树莓派上面可以正常打印出来,而且在阿里云物联网平台上面也可以看到是有数据上传过来了的。

完整代码。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import sys import re from linkkit import linkkit import threading import traceback import inspect import time import logging import serial #串口通信引入的包 # config log __log_format = '%(asctime)s-%(process)d-%(thread)d - %(name)s:%(module)s:%(funcName)s - %(levelname)s - %(message)s' logging.basicConfig(format=__log_format) class CustomerThing(object): def __init__(self): self.__linkkit = linkkit.LinkKit( host_name="cn-shanghai", product_key="xxxxxxxx", #修改设备三元组 device_name="xxxxxxxxxxxxx", device_secret="xxxxxxxxxxxxxxxxxxxxxxx" ) self.__linkkit.enable_logger(logging.DEBUG) self.__linkkit.on_device_dynamic_register = self.on_device_dynamic_register self.__linkkit.on_connect = self.on_connect self.__linkkit.on_disconnect = self.on_disconnect self.__linkkit.on_topic_message = self.on_topic_message self.__linkkit.on_subscribe_topic = self.on_subscribe_topic self.__linkkit.on_unsubscribe_topic = self.on_unsubscribe_topic self.__linkkit.on_publish_topic = self.on_publish_topic self.__linkkit.on_thing_enable = self.on_thing_enable self.__linkkit.on_thing_disable = self.on_thing_disable self.__linkkit.on_thing_event_post = self.on_thing_event_post self.__linkkit.on_thing_prop_post = self.on_thing_prop_post self.__linkkit.on_thing_prop_changed = self.on_thing_prop_changed self.__linkkit.on_thing_call_service = self.on_thing_call_service self.__linkkit.on_thing_raw_data_post = self.on_thing_raw_data_post self.__linkkit.on_thing_raw_data_arrived = self.on_thing_raw_data_arrived self.__linkkit.thing_setup("tsl.json") self.__linkkit.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031") self.__call_service_request_id = 0 def on_device_dynamic_register(self, rc, value, userdata): if rc == 0: print("dynamic register device success, value:" + value) else: print("dynamic register device fail, message:" + value) def on_connect(self, session_flag, rc, userdata): print("on_connect:%d,rc:%d,userdata:" % (session_flag, rc)) def on_disconnect(self, rc, userdata): print("on_disconnect:rc:%d,userdata:" % rc) def on_topic_message(self, topic, payload, qos, userdata): print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos)) pass def on_subscribe_topic(self, mid, granted_qos, userdata): print("on_subscribe_topic mid:%d, granted_qos:%s" % (mid, str(','.join('%s' % it for it in granted_qos)))) pass def on_unsubscribe_topic(self, mid, userdata): print("on_unsubscribe_topic mid:%d" % mid) pass def on_publish_topic(self, mid, userdata): print("on_publish_topic mid:%d" % mid) def on_thing_prop_changed(self, params, userdata): print("on_thing_prop_changed params:" + str(params)) def on_thing_enable(self, userdata): print("on_thing_enable") def on_thing_disable(self, userdata): print("on_thing_disable") def on_thing_event_post(self, event, request_id, code, data, message, userdata): print("on_thing_event_post event:%s,request id:%s, code:%d, data:%s, message:%s" % (event, request_id, code, str(data), message)) pass def on_thing_prop_post(self, request_id, code, data, message,userdata): print("on_thing_prop_post request id:%s, code:%d, data:%s message:%s" % (request_id, code, str(data), message)) def on_thing_raw_data_arrived(self, payload, userdata): print("on_thing_raw_data_arrived:%s" % str(payload)) def on_thing_raw_data_post(self, payload, userdata): print("on_thing_raw_data_post: %s" % str(payload)) def on_thing_call_service(self, identifier, request_id, params, userdata): print("on_thing_call_service identifier:%s, request id:%s, params:%s" % (identifier, request_id, params)) self.__call_service_request_id = request_id pass def user_loop(self): self.__linkkit.connect_async() tips = "1: disconnectn" + "2 connect&loopn" + "3 subscribe topicn" + "4 unsubscribe topicn" + "5 public topicn" + "" while True: # 获得接受缓冲区字符 count = ser.inWaiting() if count != 0: #读取内容并显示 recv = ser.read(count) print("receive data is : " + str(recv)+'n') #将串口接受到的数据(字节型)转换成字符串 str0 = str(recv) #对b''进行替换 str1 = re.sub("b","",str0) str2 = re.sub("'","",str1) int1 = int(str2) print(int1) #将整形变量int1上传到阿里云中,标识符为WaterLevel prop_data = { "WaterLevel": int1 } self.__linkkit.thing_post_property(prop_data) if __name__ == "__main__": #打开串口 ser = serial.Serial('/dev/ttyS0', 9600) if ser.isOpen == False: ser.open() #打开串口 custom_thing = CustomerThing() custom_thing.user_loop()

最后

以上就是灵巧水蜜桃最近收集整理的关于通过LoRa模块SX1268点对点通信,树莓派接收并发送数据到阿里云物联网平台的全部内容,更多相关通过LoRa模块SX1268点对点通信内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(109)

评论列表共有 0 条评论

立即
投稿
返回
顶部