一、概述
根据星型模型的概念,不存在渐变维度,数据存在冗,典型例子地域维度表,如国家,省,市这种树形数据结构。
OLTP数据结构:
id | pid | name |
1 | 中国 | |
2 | 1 | 广东省 |
3 | 2 | 深圳 |
期望的星型模型数据结构:
id | country | province | city | level |
1 | 中国 | UNKNOWN | UNKNOWN | 1 |
2 | 中国 | 广东省 | UNKNOWN | 2 |
3 | 中国 | 广东省 | 深圳 | 3 |
二、GreenPlum处理方案
gp底层是PostgreSQL, 支持递归,实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43WITH RECURSIVE cte AS ( SELECT "id", "name", 1 AS "level" FROM t_location UNION ALL SELECT "c"."id", ( "p"."name" || '/' || "c"."name" ) :: VARCHAR (255) AS "name", "p"."level" + 1 AS "level" FROM cte AS "p" JOIN t_location AS "c" ON "c"."pid" = "p"."id" ) SELECT "id", split_part("name", '/', 1) AS country, split_part("name", '/', 2) AS province, split_part("name", '/', 3) AS city, "level" FROM ( SELECT "c"."id", "c"."name", "c"."level" FROM cte "c" INNER JOIN ( SELECT "id", MAX ("level") AS "lv" FROM cte GROUP BY "id" ) AS "d" ON "c"."id" = "d"."id" AND "c"."level" = "d"."lv" ) AS "t"
源数据:
id | pid | name |
1 | 中国 | |
2 | 1 | 广东省 |
3 | 2 | 深圳 |
4 | 2 | 珠海 |
5 | 1 | 福建省 |
6 | 5 | 厦门 |
输出:
id | country | province | city | level |
1 | 中国 | 1 | ||
2 | 中国 | 广东省 | 2 | |
5 | 中国 | 福建省 | 2 | |
3 | 中国 | 广东省 | 深圳 | 3 |
4 | 中国 | 广东省 | 珠海 | 3 |
6 | 中国 | 福建省 | 厦门 | 3 |
三、Spark GraphX 实现方案
如果数据在Hadoop环境下,由于spark sql不支持recursive, 经过一番调研,网上找到了Spark GraphX的解决方案,主要利用Pregel接口。Pregel 是一个迭代图处理模型,由谷歌开发,它使用顶点之间的消息传递进行一系列的迭代。GraphX 实现了类似 Pregel 块同步消息传递 API。Pregel的步骤是顶点接收上一个super step的消息,基于消息计算新值赋予顶点,并发送消息给相连的下一个super step。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196package net.demo; import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.graphx._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.DataFrame // 消息 case class VertexMessage( initVertexId: Long, // 起始顶点的ID,用于判断图是否存在环 level: Int, // 层级 path: List[String], // 每层的名称集合 isCyclic: Boolean, // 是否存在环 isLeaf: Boolean // 是否叶子结点 ) // case class VertexValue( id: Long, // 结点ID name: String, // 名称 initVertexId: Long, // 起始顶点ID level: Int, // 默认0 path: List[String], // 名称集合 isCyclic: Boolean, // 是否有环,默认false isLeaf: Boolean // 是否叶子结点,默认true ) case class MyVertex( id: Long, name: String ) object HierarchyToSingleRow { // 计算顶点新值 def vprog( vertexId: VertexId, // 当前顶点ID value: VertexValue, // 当前顶点值 message: VertexMessage // 上一超步发送的消息 ): VertexValue = { if (message.level == 0) { // 初始级 value.copy(level = value.level + 1) } else if (message.isCyclic) { value.copy(isCyclic = true) } else if (!message.isLeaf) { value.copy(isLeaf = false) } else { value.copy( initVertexId = message.initVertexId, level = value.level + 1, path = value.name :: message.path ) } } // src -- edge -- dst // triplet: src.id, dst.id, src.attr, e.attr, dst.attr def sendMsg( triplet: EdgeTriplet[VertexValue, String] ): Iterator[(VertexId, VertexMessage)] = { val src = triplet.srcAttr val dst = triplet.dstAttr // 判断是否有环 if (src.initVertexId == triplet.dstId || src.initVertexId == dst.initVertexId) { // 有环 if (!src.isCyclic) { // 设为有环 Iterator( ( triplet.dstId, VertexMessage( initVertexId = src.initVertexId, level = src.level, path = src.path, isCyclic = true, isLeaf = src.isLeaf ) ) ) } else { // false 则忽略 Iterator.empty } } else { // 无环,继续 if (src.isLeaf) { // 初始化时所有的结点都是叶子结点,两个顶点存在边,那么src顶点就是不是叶子结点 Iterator( ( triplet.srcId, VertexMessage( initVertexId = src.initVertexId, level = src.level, path = src.path, isCyclic = false, isLeaf = false // 非常重要 ) ) ) } else { // 给dst结点设置新值 Iterator( ( triplet.dstId, VertexMessage( initVertexId = src.initVertexId, level = src.level, path = src.path, isCyclic = false, // Set to false so that cyclic updating is ignored in vprog isLeaf = true // Set to true so that leaf updating is ignored in vprog ) ) ) } } } // 合并函数 def mergeMsg(msg1: VertexMessage, msg2: VertexMessage): VertexMessage = msg2 def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("Spark GraphX Example").getOrCreate(); import spark.implicits._ // 读取数据 val locationDF = spark.read .csv("/user/spark/location.csv") .toDF("id", "pid", "name") .cache() // 构建顶点RDD val verticesRdd: RDD[(VertexId, MyVertex)] = locationDF .withColumn("id", $"id".cast("Long")) .select( $"id", $"name" ) .rdd .map(loc => (loc.getLong(0), MyVertex(loc.getLong(0), loc.getString(1)))) // 构建边,确定顶点指向顶点的方向,本用例 pid -> id val edgesRd: RDD[Edge[String]] = locationDF .filter( $"pid".isNotNull ) .withColumn("id", $"id".cast("Long")) .withColumn("pid", $"pid".cast("Long")) .select( $"pid", $"id" ) .rdd .map(loc => Edge(loc.getLong(0), loc.getLong(1), "top-down")) // top-down 是edge的属性,在本用例中没有用途 // 构建图 val graph = Graph(verticesRdd, edgesRd).cache() // 初始化图中各顶点的值 val valueGraph = graph.mapVertices { (id, v) => VertexValue( id = v.id, name = v.name, initVertexId = id, level = 0, path = List(v.name), isCyclic = false, isLeaf = false ) } // 初始消息 val initialMsg = VertexMessage( initVertexId = 0L, level = 0, path = Nil, isCyclic = false, isLeaf = true ) val results = valueGraph.pregel( initialMsg, // 初始消息 Int.MaxValue, // 迭代次数 EdgeDirection.Out // 往边的哪个方向输出消息 )( vprog, // 计算函数 sendMsg, // 发送消息函数 mergeMsg // 合并函数 ) // 转换 val df = results.vertices .map { case (id, v) => ( id, v.level, v.path.reverse.mkString("/") ) } .toDF("id", "level", "path") df.withColumn("_tmp", split($"path", "/")) .select( $"id", $"_tmp".getItem(0).as("country"), $"_tmp".getItem(1).as("province"), $"_tmp".getItem(2).as("city"), $"level" ) .show spark.stop } }
location.csv
1
2
3
4
5
61,,中国 2,1,广东省 3,2,深圳 4,2,珠海 5,1,福建省 6,5,厦门
输出:
id | country | province | city | level |
1 | 中国 | null | null | 1 |
2 | 中国 | 广东省 | null | 2 |
5 | 中国 | 福建省 | null | 2 |
3 | 中国 | 广东省 | 深圳 | 3 |
4 | 中国 | 广东省 | 珠海 | 3 |
6 | 中国 | 福建省 | 厦门 | 3 |
四、备注
代码注释不一定正确,部分是个人理解,如有错误,欢迎指正。
参考文献:
http://spark.apache.org/docs/latest/graphx-programming-guide.html
https://blog.csdn.net/qq_38265137/article/details/80547763
https://james.faeldon.com/generating-descendant-table-using-spark-graph-x#generating-descendant-table-using-spark-graphx
https://www.jianshu.com/p/d9170a0723e4
最后
以上就是可靠飞机最近收集整理的关于数据仓库 - 树形结构的维表开发实践一、概述二、GreenPlum处理方案三、Spark GraphX 实现方案四、备注的全部内容,更多相关数据仓库内容请搜索靠谱客的其他文章。
发表评论 取消回复