样式对象
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274package com.dmp.beans import com.dmp.utils.NumFormat class Log(val sessionid: String, val advertisersid: Int, val adorderid: Int, val adcreativeid: Int, val adplatformproviderid: Int, val sdkversion: String, val adplatformkey: String, val putinmodeltype: Int, val requestmode: Int, val adprice: Double, val adppprice: Double, val requestdate: String, val ip: String, val appid: String, val appname: String, val uuid: String, val device: String, val client: Int, val osversion: String, val density: String, val pw: Int, val ph: Int, val long: String, val lat: String, val provincename: String, val cityname: String, val ispid: Int, val ispname: String, val networkmannerid: Int, val networkmannername: String, val iseffective: Int, val isbilling: Int, val adspacetype: Int, val adspacetypename: String, val devicetype: Int, val processnode: Int, val apptype: Int, val district: String, val paymode: Int, val isbid: Int, val bidprice: Double, val winprice: Double, val iswin: Int, val cur: String, val rate: Double, val cnywinprice: Double, val imei: String, val mac: String, val idfa: String, val openudid: String, val androidid: String, val rtbprovince: String, val rtbcity: String, val rtbdistrict: String, val rtbstreet: String, val storeurl: String, val realip: String, val isqualityapp: Int, val bidfloor: Double, val aw: Int, val ah: Int, val imeimd5: String, val macmd5: String, val idfamd5: String, val openudidmd5: String, val androididmd5: String, val imeisha1: String, val macsha1: String, val idfasha1: String, val openudidsha1: String, val androididsha1: String, val uuidunknow: String, val userid: String, val iptype: Int, val initbidprice: Double, val adpayment: Double, val agentrate: Double, val lomarkrate: Double, val adxrate: Double, val title: String, val keywords: String, val tagid: String, val callbackdate: String, val channelid: String, val mediatype: Int) extends Product with Serializable { //角标和成员属性的映射关系 override def productElement(n: Int): Any = n match { case 0 => sessionid case 1 => advertisersid case 2 => adorderid case 3 => adcreativeid case 4 => adplatformproviderid case 5 => sdkversion case 6 => adplatformkey case 7 => putinmodeltype case 8 => requestmode case 9 => adprice case 10 => adppprice case 11 => requestdate case 12 => ip case 13 => appid case 14 => appname case 15 => uuid case 16 => device case 17 => client case 18 => osversion case 19 => density case 20 => pw case 21 => ph case 22 => long case 23 => lat case 24 => provincename case 25 => cityname case 26 => ispid case 27 => ispname case 28 => networkmannerid case 29 => networkmannername case 30 => iseffective case 31 => isbilling case 32 => adspacetype case 33 => adspacetypename case 34 => devicetype case 35 => processnode case 36 => apptype case 37 => district case 38 => paymode case 39 => isbid case 40 => bidprice case 41 => winprice case 42 => iswin case 43 => cur case 44 => rate case 45 => cnywinprice case 46 => imei case 47 => mac case 48 => idfa case 49 => openudid case 50 => androidid case 51 => rtbprovince case 52 => rtbcity case 53 => rtbdistrict case 54 => rtbstreet case 55 => storeurl case 56 => realip case 57 => isqualityapp case 58 => bidfloor case 59 => aw case 60 => ah case 61 => imeimd5 case 62 => macmd5 case 63 => idfamd5 case 64 => openudidmd5 case 65 => androididmd5 case 66 => imeisha1 case 67 => macsha1 case 68 => idfasha1 case 69 => openudidsha1 case 70 => androididsha1 case 71 => uuidunknow case 72 => userid case 73 => iptype case 74 => initbidprice case 75 => adpayment case 76 => agentrate case 77 => lomarkrate case 78 => adxrate case 79 => title case 80 => keywords case 81 => tagid case 82 => callbackdate case 83 => channelid case 84 => mediatype } //对象有多少个成员属性 override def productArity: Int = 85 //比较两个对象是否是同一个对象 override def canEqual(that: Any): Boolean = that.isInstanceOf[Log] } //样例对象 object Log { def apply(arr: Array[String]): Log = new Log( arr(0), NumFormat.toInt(arr(1)), NumFormat.toInt(arr(2)), NumFormat.toInt(arr(3)), NumFormat.toInt(arr(4)), arr(5), arr(6), NumFormat.toInt(arr(7)), NumFormat.toInt(arr(8)), NumFormat.toDouble(arr(9)), NumFormat.toDouble(arr(10)), arr(11), arr(12), arr(13), arr(14), arr(15), arr(16), NumFormat.toInt(arr(17)), arr(18), arr(19), NumFormat.toInt(arr(20)), NumFormat.toInt(arr(21)), arr(22), arr(23), arr(24), arr(25), NumFormat.toInt(arr(26)), arr(27), NumFormat.toInt(arr(28)), arr(29), NumFormat.toInt(arr(30)), NumFormat.toInt(arr(31)), NumFormat.toInt(arr(32)), arr(33), NumFormat.toInt(arr(34)), NumFormat.toInt(arr(35)), NumFormat.toInt(arr(36)), arr(37), NumFormat.toInt(arr(38)), NumFormat.toInt(arr(39)), NumFormat.toDouble(arr(40)), NumFormat.toDouble(arr(41)), NumFormat.toInt(arr(42)), arr(43), NumFormat.toDouble(arr(44)), NumFormat.toDouble(arr(45)), arr(46), arr(47), arr(48), arr(49), arr(50), arr(51), arr(52), arr(53), arr(54), arr(55), arr(56), NumFormat.toInt(arr(57)), NumFormat.toDouble(arr(58)), NumFormat.toInt(arr(59)), NumFormat.toInt(arr(60)), arr(61), arr(62), arr(63), arr(64), arr(65), arr(66), arr(67), arr(68), arr(69), arr(70), arr(71), arr(72), NumFormat.toInt(arr(73)), NumFormat.toDouble(arr(74)), NumFormat.toDouble(arr(75)), NumFormat.toDouble(arr(76)), NumFormat.toDouble(arr(77)), NumFormat.toDouble(arr(78)), arr(79), arr(80), arr(81), arr(82), arr(83), NumFormat.toInt(arr(84)) ) }
主体类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60package com.dmp.tools import com.dmp.beans.Log import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /* F:\牛牛学堂大数据24期\09-实训实战-9天\dmp&&移动项目\dmp\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2 snappy C:\Users\admin\Desktop\result2 */ //将原始日志文件转换成parquet文件格式,1.6版本默认gzip压缩,2.x版本默认snappy压缩 object Bzip2ParquetV2 { def main(args: Array[String]): Unit = { // 0 校验参数个数 if (args.length != 3) { println( """ |cn.dmp.tools.Bzip2Parquet |参数: | logInputPath | compressionCode <snappy, gzip, lzo> | resultOutputPath """.stripMargin //stripMargin 输出的时候换行对其 ) sys.exit() } // 1 接受程序参数 日志的输入,文件格式,输出路径 val Array(logInputPath, compressionCode, resultOutputPath) = args val conf = new SparkConf() conf.setAppName(s"${this.getClass.getSimpleName}") .setMaster("local[*]") //RDD 系列化到磁盘上,worker与worker之间的数据传输,如果集群中已经配置,无需多配置 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) //注册自定义类的序列化方式 conf.registerKryoClasses(Array(classOf[Log])) val sQLContext = new SQLContext(sc) sQLContext.setConf("spark.sql.parquet.compression.codec", compressionCode) //3 读取日志数据 val rawdata = sc.textFile(logInputPath) //4 根据业务需求对数据进行ETL val dataLog = rawdata.map(line => line.split(",", -1)) .filter(_.length >= 85) .map(arr => Log(arr) ) //将结果保存到本地磁盘 val dataFrame = sQLContext.createDataFrame(dataLog) //按照省份名称以及地市名称对数据进行分区 dataFrame.write.partitionBy("provincename","cityname").parquet(resultOutputPath) sc.stop() } }
最后
以上就是现实白猫最近收集整理的关于自定义bean对象实现日志转换parquet的全部内容,更多相关自定义bean对象实现日志转换parquet内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复