我是靠谱客的博主 可靠飞机,这篇文章主要介绍数据仓库 - 树形结构的维表开发实践一、概述二、GreenPlum处理方案三、Spark GraphX 实现方案四、备注,现在分享给大家,希望可以做个参考。

一、概述

根据星型模型的概念,不存在渐变维度,数据存在冗,典型例子地域维度表,如国家,省,市这种树形数据结构。

OLTP数据结构:

idpidname
1 中国
21广东省
32深圳

期望的星型模型数据结构:

idcountryprovincecitylevel
1中国UNKNOWNUNKNOWN1
2中国广东省UNKNOWN2
3中国广东省深圳3

二、GreenPlum处理方案

gp底层是PostgreSQL, 支持递归,实现如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
WITH RECURSIVE cte AS ( SELECT "id", "name", 1 AS "level" FROM t_location UNION ALL SELECT "c"."id", ( "p"."name" || '/' || "c"."name" ) :: VARCHAR (255) AS "name", "p"."level" + 1 AS "level" FROM cte AS "p" JOIN t_location AS "c" ON "c"."pid" = "p"."id" ) SELECT "id", split_part("name", '/', 1) AS country, split_part("name", '/', 2) AS province, split_part("name", '/', 3) AS city, "level" FROM ( SELECT "c"."id", "c"."name", "c"."level" FROM cte "c" INNER JOIN ( SELECT "id", MAX ("level") AS "lv" FROM cte GROUP BY "id" ) AS "d" ON "c"."id" = "d"."id" AND "c"."level" = "d"."lv" ) AS "t"

源数据:

idpidname
1 中国
21广东省
32深圳
42珠海
51福建省
65厦门

输出:

idcountryprovincecitylevel
1中国  1
2中国广东省 2
5中国福建省 2
3中国广东省深圳3
4中国广东省珠海3
6中国福建省厦门3

三、Spark GraphX 实现方案

如果数据在Hadoop环境下,由于spark sql不支持recursive, 经过一番调研,网上找到了Spark GraphX的解决方案,主要利用Pregel接口。Pregel 是一个迭代图处理模型,由谷歌开发,它使用顶点之间的消息传递进行一系列的迭代。GraphX 实现了类似 Pregel 块同步消息传递 API。Pregel的步骤是顶点接收上一个super step的消息,基于消息计算新值赋予顶点,并发送消息给相连的下一个super step。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package net.demo; import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.graphx._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.DataFrame // 消息 case class VertexMessage( initVertexId: Long, // 起始顶点的ID,用于判断图是否存在环 level: Int, // 层级 path: List[String], // 每层的名称集合 isCyclic: Boolean, // 是否存在环 isLeaf: Boolean // 是否叶子结点 ) // case class VertexValue( id: Long, // 结点ID name: String, // 名称 initVertexId: Long, // 起始顶点ID level: Int, // 默认0 path: List[String], // 名称集合 isCyclic: Boolean, // 是否有环,默认false isLeaf: Boolean // 是否叶子结点,默认true ) case class MyVertex( id: Long, name: String ) object HierarchyToSingleRow { // 计算顶点新值 def vprog( vertexId: VertexId, // 当前顶点ID value: VertexValue, // 当前顶点值 message: VertexMessage // 上一超步发送的消息 ): VertexValue = { if (message.level == 0) { // 初始级 value.copy(level = value.level + 1) } else if (message.isCyclic) { value.copy(isCyclic = true) } else if (!message.isLeaf) { value.copy(isLeaf = false) } else { value.copy( initVertexId = message.initVertexId, level = value.level + 1, path = value.name :: message.path ) } } // src -- edge -- dst // triplet: src.id, dst.id, src.attr, e.attr, dst.attr def sendMsg( triplet: EdgeTriplet[VertexValue, String] ): Iterator[(VertexId, VertexMessage)] = { val src = triplet.srcAttr val dst = triplet.dstAttr // 判断是否有环 if (src.initVertexId == triplet.dstId || src.initVertexId == dst.initVertexId) { // 有环 if (!src.isCyclic) { // 设为有环 Iterator( ( triplet.dstId, VertexMessage( initVertexId = src.initVertexId, level = src.level, path = src.path, isCyclic = true, isLeaf = src.isLeaf ) ) ) } else { // false 则忽略 Iterator.empty } } else { // 无环,继续 if (src.isLeaf) { // 初始化时所有的结点都是叶子结点,两个顶点存在边,那么src顶点就是不是叶子结点 Iterator( ( triplet.srcId, VertexMessage( initVertexId = src.initVertexId, level = src.level, path = src.path, isCyclic = false, isLeaf = false // 非常重要 ) ) ) } else { // 给dst结点设置新值 Iterator( ( triplet.dstId, VertexMessage( initVertexId = src.initVertexId, level = src.level, path = src.path, isCyclic = false, // Set to false so that cyclic updating is ignored in vprog isLeaf = true // Set to true so that leaf updating is ignored in vprog ) ) ) } } } // 合并函数 def mergeMsg(msg1: VertexMessage, msg2: VertexMessage): VertexMessage = msg2 def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("Spark GraphX Example").getOrCreate(); import spark.implicits._ // 读取数据 val locationDF = spark.read .csv("/user/spark/location.csv") .toDF("id", "pid", "name") .cache() // 构建顶点RDD val verticesRdd: RDD[(VertexId, MyVertex)] = locationDF .withColumn("id", $"id".cast("Long")) .select( $"id", $"name" ) .rdd .map(loc => (loc.getLong(0), MyVertex(loc.getLong(0), loc.getString(1)))) // 构建边,确定顶点指向顶点的方向,本用例 pid -> id val edgesRd: RDD[Edge[String]] = locationDF .filter( $"pid".isNotNull ) .withColumn("id", $"id".cast("Long")) .withColumn("pid", $"pid".cast("Long")) .select( $"pid", $"id" ) .rdd .map(loc => Edge(loc.getLong(0), loc.getLong(1), "top-down")) // top-down 是edge的属性,在本用例中没有用途 // 构建图 val graph = Graph(verticesRdd, edgesRd).cache() // 初始化图中各顶点的值 val valueGraph = graph.mapVertices { (id, v) => VertexValue( id = v.id, name = v.name, initVertexId = id, level = 0, path = List(v.name), isCyclic = false, isLeaf = false ) } // 初始消息 val initialMsg = VertexMessage( initVertexId = 0L, level = 0, path = Nil, isCyclic = false, isLeaf = true ) val results = valueGraph.pregel( initialMsg, // 初始消息 Int.MaxValue, // 迭代次数 EdgeDirection.Out // 往边的哪个方向输出消息 )( vprog, // 计算函数 sendMsg, // 发送消息函数 mergeMsg // 合并函数 ) // 转换 val df = results.vertices .map { case (id, v) => ( id, v.level, v.path.reverse.mkString("/") ) } .toDF("id", "level", "path") df.withColumn("_tmp", split($"path", "/")) .select( $"id", $"_tmp".getItem(0).as("country"), $"_tmp".getItem(1).as("province"), $"_tmp".getItem(2).as("city"), $"level" ) .show spark.stop } }

location.csv

复制代码
1
2
3
4
5
6
1,,中国 2,1,广东省 3,2,深圳 4,2,珠海 5,1,福建省 6,5,厦门

输出:

idcountryprovincecitylevel
1中国nullnull1
2中国广东省null2
5中国福建省null2
3中国广东省深圳3
4中国广东省珠海3
6中国福建省厦门3

四、备注

代码注释不一定正确,部分是个人理解,如有错误,欢迎指正。

参考文献:

http://spark.apache.org/docs/latest/graphx-programming-guide.html

https://blog.csdn.net/qq_38265137/article/details/80547763

https://james.faeldon.com/generating-descendant-table-using-spark-graph-x#generating-descendant-table-using-spark-graphx

https://www.jianshu.com/p/d9170a0723e4

最后

以上就是可靠飞机最近收集整理的关于数据仓库 - 树形结构的维表开发实践一、概述二、GreenPlum处理方案三、Spark GraphX 实现方案四、备注的全部内容,更多相关数据仓库内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(95)

评论列表共有 0 条评论

立即
投稿
返回
顶部