我是靠谱客的博主 疯狂裙子,这篇文章主要介绍Apache IoTDB:使用Session方式执行操作,现在分享给大家,希望可以做个参考。

1.声明

当前内容主要为基于Session方式操作当前的IoTDB

2.pom依赖

复制代码
1
2
3
4
5
6
<dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-session</artifactId> <version>0.11.1</version> </dependency>

3.基本demo

主要的Recordable接口

复制代码
1
2
3
4
public interface Recordable { Record toRecord(); }

基本的Record类

复制代码
1
2
3
4
5
6
7
8
9
10
11
public class Record{ private String deviceId; private long time; private List<String> measurements; private List<String> values; // 省略get、set、方法 }

主要的实体类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.List; public class Machine implements Recordable { private String name; private Float temperature; private Boolean status; private Date timestamp; public Date getTimestamp() { return timestamp; } public void setTimestamp(Date timestamp) { this.timestamp = timestamp; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Float getTemperature() { return temperature; } public void setTemperature(Float temperature) { this.temperature = temperature; } public Boolean getStatus() { return status; } @Override public String toString() { return "Machine [name=" + name + ", temperature=" + temperature + ", status=" + status + "]"; } public void setStatus(Boolean status) { this.status = status; } public Machine() { super(); // TODO Auto-generated constructor stub } public Machine(String name, Float temperature, Boolean status) { super(); this.name = name; this.temperature = temperature; this.status = status; this.timestamp = new Date(); } public Record toRecord() { // TODO Auto-generated method stub DateFormat format = new SimpleDateFormat("yyyyMMddHHmmssSSS"); Record record = new Record(); record.setDeviceId("root.test.machine"); record.setTime(Long.parseLong(format.format(this.getTimestamp()))); record.setMeasurements(Arrays.asList("name", "temperature", "status")); record.setValues(Arrays.asList(this.getName(), this.getTemperature().toString(), this.getStatus().toString())); return record; } }

主要的测试类:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; import org.apache.iotdb.session.SessionDataSet; import org.apache.iotdb.session.SessionUtils; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.RowRecord; import com.hy.springboot.iotdb.Machine; import com.hy.springboot.iotdb.Record; import com.hy.springboot.iotdb.Recordable; /** * * @author hy * @createTime 2021-03-06 15:04:38 * @description 当前内容为主要使用iotdb的session方式操作iotdb时序数据库(存在问题,无法搭建iotDB集群问题,很严重) * */ public class IOTDBSessionConnectionTest { private static final String host = "192.168.1.101"; private static final int rpcPort = 6667; private static final String username = "root"; private static final String password = "root"; public static void main(String[] args) throws InterruptedException { Session session = new Session(host, rpcPort, username, password); try { session.open(); String storageGroupId = "root.test"; // 1.创建一个存储组 session.setStorageGroup(storageGroupId); // 2.创建一个时序 session.createTimeseries(storageGroupId + ".machine.name", TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.LZ4); session.createTimeseries(storageGroupId + ".machine.temperature", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.LZ4); session.createTimeseries(storageGroupId + ".machine.status", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.LZ4); // 3.开始添加数据,这里需要休眠,否则数据都是一起的 List<Machine> machines = new ArrayList<Machine>(); machines.add(new Machine("机器1", 400.5f, true)); Thread.sleep(100); machines.add(new Machine("机器1", 380.5f, true)); Thread.sleep(100); machines.add(new Machine("机器1", 420.5f, true)); Thread.sleep(100); machines.add(new Machine("机器1", 390.5f, true)); Thread.sleep(100); // 4.开始添加数据 // session.insertRecord("root.test.machine", new Date().getTime(), measurements, // values); session.insertRecords(getDeviceIds(machines), getTimes(machines), getMeasurementsList(machines), getValuesList(machines)); //session.insertTablet(tablet); String sql = String.format("select * from %s.%s", storageGroupId, "machine"); SessionDataSet sessionDataSet = session.executeQueryStatement(sql); int fetchSize = sessionDataSet.getFetchSize(); List<String> columnNames = sessionDataSet.getColumnNames(); List<TSDataType> columnTypes = sessionDataSet.getColumnTypes(); System.out.println(columnNames); System.out.println(columnTypes); if (fetchSize > 0) { while (sessionDataSet.hasNext()) { RowRecord next = sessionDataSet.next(); List<Field> fields = next.getFields(); // 查询结果第一个为时间搓 long timestamp = next.getTimestamp(); System.out.println(timestamp + "t"); for (int i = 0; i < fields.size(); i++) { Field field = fields.get(i); // 这里的需要按照类型获取 System.out.println(field.getObjectValue(field.getDataType())); } System.out.println(); } } sessionDataSet.closeOperationHandle(); // 删除当前的storage group sql = String.format("delete storage grup %s", storageGroupId); session.executeNonQueryStatement(sql); } catch (IoTDBConnectionException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (StatementExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { try { session.close(); } catch (IoTDBConnectionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private static List<String> getDeviceIds(List<? extends Recordable> records) { List<String> deviceIds = new ArrayList<String>(); for (Recordable recordable : records) { Record record = recordable.toRecord(); String deviceId = record.getDeviceId(); deviceIds.add(deviceId); } return deviceIds; } private static List<Long> getTimes(List<? extends Recordable> records) { List<Long> times = new ArrayList<Long>(); for (Recordable recordable : records) { Record record = recordable.toRecord(); times.add(record.getTime()); } return times; } private static List<List<String>> getMeasurementsList(List<? extends Recordable> records) { List<List<String>> measurementsList = new ArrayList<List<String>>(); for (Recordable recordable : records) { Record record = recordable.toRecord(); measurementsList.add(record.getMeasurements()); } return measurementsList; } private static List<List<String>> getValuesList(List<? extends Recordable> records) { List<List<String>> valuesList = new ArrayList<List<String>>(); for (Recordable recordable : records) { Record record = recordable.toRecord(); valuesList.add(record.getValues()); } return valuesList; } }

执行结果:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[Time, root.test.machine.name, root.test.machine.temperature, root.test.machine.status] [INT64, TEXT, FLOAT, BOOLEAN] 20210307084508369 机器1 400.5 true 20210307084508479 机器1 380.5 true 20210307084508589 机器1 420.5 true 20210307084508698 机器1 390.5 true

4.总结

1.在创建timeseries的时候还是需要填写全名称:root.储存组.存储数据库.字段

2.感觉session方式就是一种直接解析方式,不需要解析sql

3.通过查看官方文档,发现官方文档与实际的发布东西有点不匹配,无法搭建集群无法测试集群,并且发行版总没有一些启动或者配置文件(时间2021/03/07)

最后

以上就是疯狂裙子最近收集整理的关于Apache IoTDB:使用Session方式执行操作的全部内容,更多相关Apache内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(94)

评论列表共有 0 条评论

立即
投稿
返回
顶部