我是靠谱客的博主 难过电脑,这篇文章主要介绍Thingsboard源码全解析(一)_启动类说明,现在分享给大家,希望可以做个参考。

以前也了解过Thingsboard, 感觉挺好的, 最起码100%开源, 源码直接下载, 但是呢网上的源码解析比较少, 借此机会也来贡献一波,以下是官网扒下来的整体架构图

  • TB组件: 具备水平可扩展和容错特性
  • 消息队列: Kafka, RabbitMQ, AWS SQS, Azure Service Bus, Google Pub/Sub
  • 数据库: PostgreSQL 及 Timescale/Cassandra(NoSQL)
  • 设备网关: 支持Modbus, OPC-UA, BLE, CAN, MQTT, HTTP及其他协议

1.设备接入(tb-transport-mqtt)

1.1 MqttTransportService启动类

其实我也不知道它那些代码项目包结构是按照什么去逻辑去架构的, 咱们从设备接入(tb-transport-mqtt)开始研究. 做为一个接入服务, Thingsboard是基于Netty去开发的, 以下是启动代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MqttTransportService { ..... @Autowired private MqttTransportContext context; @PostConstruct public void init() throws Exception { log.info("Setting resource leak detector level to {}", ); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MqttTransportServerInitializer(context)) .childOption(ChannelOption.SO_KEEPALIVE, keepAlive); } ..... }

一个很常规的启动代码, ResourceLeakDetector是资源泄露检测等级, 这里不细说了. Netty实现ByteBuf的资源回收还有2种做法,

  • 手动调用ReferenceCountUtil.release(bytebuf)
  • Handler继承于SimpleChannelInboundHandler, 它有个autoRelease属性, 2种方式实现方式是一样的

MqttTransportContext见名知意, 整个传输层的上下文对象, 并且是单例的, 可见其重要程度, 既然这么重要先看下这个类里面有什么东西.

1.2 MqttTransportContext

先看下这个类的继承结构和属性
在这里插入图片描述
首先看MqttTransportContext类里面的属性

1.2.1 slHandlerProvider: MqttSslHandlerProvider提供了getSslHandler方法,获取一个SslHandler, 这个handler是Netty封装的针对于处理SSL/TLS加密数据的。其继承了ChannelInboundHandlerAdapter和实现了ChannelOutboundHandler,所以在数据流入和输出时会对数据进行解密和加密。

加密配置在这个文件tb-mqtt-transport.yml, 内容如下

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# MQTT server parameters transport: mqtt: ssl: # Enable/disable SSL support enabled: "${MQTT_SSL_ENABLED:false}" # SSL protocol: See protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}" # Path to the key store that holds the SSL certificate key_store: "${MQTT_SSL_KEY_STORE:mqttserver.jks}" # Password used to access the key store key_store_password: "${MQTT_SSL_KEY_STORE_PASSWORD:server_ks_password}" # Password used to access the key key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"

当 transport.mqtt.ssl.enabled = true, MqttSslHandlerProvider就会被加载到Spring容器中

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component("MqttSslHandlerProvider") @ConditionalOnProperty(prefix = "transport.mqtt.ssl", value = "enabled", havingValue = "true", matchIfMissing = false) public class MqttSslHandlerProvider { @Value("${transport.mqtt.ssl.protocol}") private String sslProtocol; @Value("${transport.mqtt.ssl.key_store}") private String keyStoreFile; @Value("${transport.mqtt.ssl.key_store_password}") private String keyStorePassword; @Value("${transport.mqtt.ssl.key_password}") private String keyPassword; @Value("${transport.mqtt.ssl.key_store_type}") private String keyStoreType; public SslHandler getSslHandler() { try { ..... SSLContext sslContext = SSLContext.getInstance(sslProtocol); sslContext.init(km, tm, null); SSLEngine sslEngine = sslContext.createSSLEngine(); sslEngine.setUseClientMode(false); // 单向校验, 服务器一般不会对客户端进行握手校验,校验的话也是在上层进行校验 sslEngine.setNeedClientAuth(false); // 单向校验 sslEngine.setWantClientAuth(true); sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols()); sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites()); sslEngine.setEnableSessionCreation(true); return new SslHandler(sslEngine); } catch (Exception e) { log.error("Unable to set up SSL context. Reason: " + e.getMessage(), e); throw new RuntimeException("Failed to get SSL handler", e); } } .......

SSL/TLS虽然是加密传输,但是也要注意防止SSL劫持和SSL剥离攻击

1.2.2 MqttTransportAdaptor : 客户端消息通过MqttDecoder将消息转成MqttMessage类型,而MqttMessage对象中的payload是一个Obj类型,所以就需要MqttTransportAdaptor将payload转成一个具体的类型的对象。

示例代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { try { if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = adaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) { TransportProtos.PostAttributeMsg postAttributeMsg = adaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); transportService.process(sessionInfo, postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); } ... } else { transportService.reportActivity(sessionInfo); } } catch (AdaptorException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); ctx.close(); } }

MqttTransportAdaptor会将MqttPublishMessage对象中的payload转成一个具体的msg对象。

1.3 TransportContext

TransportContext类包含了一些上下文信息类该有的关键属性,方法。属性指的是TbServiceInfoProvider类,它里面包含了整个服务的ServiceId, serviceType, tenantId等基础信息。方法指的是TransportService,该类主要提供了设备认证,发布消息处理,订阅消息处理,DeviceSession注册等功能的能力,我们可以理解为MqttTransportHandler是一个消息入口,那么TransportService就是一个具体功能实现类。

Thingsborad源码中定义了TransportContext,将整个设备会话过程的连接,消息上下行,会话处理等等设备整个生命周期内的活动统一抽象成了一个Context,不同协议实现不同的Context。这里和很多其他开源框架的思想有点类似。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected final ObjectMapper mapper = new ObjectMapper(); @Autowired private TransportService transportService; @Autowired private TbServiceInfoProvider serviceInfoProvider; @Getter private ExecutorService executor; @PostConstruct public void init() { executor = Executors.newWorkStealingPool(50); } @PreDestroy public void stop() { if (executor != null) { executor.shutdownNow(); } } public String getNodeId() { return serviceInfoProvider.getServiceId();

目前启动类中的内容介绍完毕,后面模拟一个设备继续探究其源码处理过程,当然也包括Thingsboard的一些核心概念剖析,再结合业内知名的物联网服务提供商(Link Kit/)的解决方案进行对比对比。

最后

以上就是难过电脑最近收集整理的关于Thingsboard源码全解析(一)_启动类说明的全部内容,更多相关Thingsboard源码全解析(一)_启动类说明内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(98)

评论列表共有 0 条评论

立即
投稿
返回
顶部