直接上代码。涉及技术点:
- Json解析成关系型数据表结构
- MapReduce多文件输出
- 消除MapReduce默认文件输出格式
1configuration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
控制_SUCCESS文件生成。
(1)FileOutputFormat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19package com.leboop.www.json; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * Created by leboop on 2020/7/18. */ class FileOutputFormat extends TextOutputFormat<NullWritable, Text> { @Override public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { // TODO Auto-generated method stub FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context); return new Path(committer.getWorkPath(), getOutputName(context)); } }
自定义FileOutputFormat取消生成的文件后缀r-0000等,比如需要生成file1,如果没有实现该类,会生成file1-r-0000。
(2)JsonParser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38package com.leboop.www.json; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import java.util.List; import java.util.Map; /** * Created by leboop on 2020/7/18. */ public class JsonParser { public static void parse(Path path) throws Exception { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://192.168.128.11:9000"); System.setProperty("HADOOP_USER_NAME", "root"); Job job = Job.getInstance(configuration, "JsonParser"); job.setJarByClass(Main.class); job.setMapperClass(JsonMapper.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(FileOutputFormat.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, path); Map<String, List<String>> saveFileMap = Utils.readSaveFile(); for (Map.Entry<String, List<String>> entry : saveFileMap.entrySet()) { System.out.println(entry.getKey()); MultipleOutputs.addNamedOutput(job, entry.getKey(), FileOutputFormat.class, NullWritable.class, Text.class); } FileOutputFormat.setOutputPath(job, new Path("/output/json")); LazyOutputFormat.setOutputFormatClass(job, FileOutputFormat.class); job.waitForCompletion(true); } }
LazyOutputFormat.setOutputFormatClass(job, FileOutputFormat.class);可以取消多余的文件part-r-0000.
这里的setOutputPath是设置所有文件相同的基目录,比如生成的文件分别在如下目录
/output/json/d1/file1
/output/json/d2/file2
/output/json/d3/file1
基目录设置为/output/json即可。
(3)JsonMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183package com.leboop.www.json; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.*; /** * Created by leboop on 2020/7/18. */ public class JsonMapper extends Mapper<LongWritable, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs = null; /** * 保存字段名和字段值(字段值可能多个) */ private Map<String, List<String>> map = new HashMap<String, List<String>>(); /** * 保存文件名,一个文件名相当于一个关系型数据库表 */ private Map<String, List<String>> saveFileMap = new HashMap<String, List<String>>(); @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<>(context); saveFileMap = Utils.readSaveFile(); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String jsonStr = value.toString(); // 使用gson包,将json字符串转换成json对象 com.google.gson.JsonParser jsonParser = new com.google.gson.JsonParser(); JsonElement e = jsonParser.parse(jsonStr); System.out.println("读取到的json字符串:"); System.out.println(e); // 解析json StringBuilder keySb = new StringBuilder(); jsonTree(e, keySb); System.out.println("解析后的json数据:"); for (Map.Entry<String, List<String>> entry : map.entrySet()) { System.out.println(entry.getKey() + " = " + entry.getValue()); } // for (Map.Entry<String, List<String>> entry : saveFileMap.entrySet()) { // 遍历文件 int maxSize = 0; List<List<String>> tmp = new ArrayList<List<String>>(); for (String fieldName : entry.getValue()) {//遍历字段 List<String> fieldValueList = map.get(fieldName); //字段可能不存在 if (fieldValueList != null) { if (fieldValueList.size() > maxSize) { maxSize = fieldValueList.size(); } tmp.add(fieldValueList); } else { tmp.add(Arrays.asList("")); } } // 填充数组 String[][] arr = fill(tmp, maxSize); //数组转换成字符串 String resultStr = arrToStr(arr); //写入保存文件 System.out.println("保存文件" + entry.getKey() + ":"); System.out.println(resultStr); System.out.println("++++++++++++++++++++++++++++++++++++++++++"); multipleOutputs.write(entry.getKey(), null, new Text(resultStr),String.valueOf(Time.now())+"/"+entry.getKey()); } } /** * 数组转换成可写入文件的字符串 * * @param arr * @return */ private String arrToStr(String[][] arr) { StringBuilder resultSb = new StringBuilder(); for (int r = 0; r < arr.length; r++) { for (int c = 0; c < arr[r].length; c++) { resultSb.append(arr[r][c]).append("|"); } resultSb.append("n"); } return resultSb.toString(); } /** * 数据填充 * * @param tmp * @param maxSize * @return */ private String[][] fill(List<List<String>> tmp, int maxSize) { String[][] arr = new String[maxSize][tmp.size()]; if (tmp.size() > 0) { //数据填充 for (int c = 0; c < tmp.size(); c++) { List<String> curr = tmp.get(c); if (maxSize % curr.size() != 0) { // 异常数据 break; } else { int rep = maxSize / curr.size(); //副本数 for (int k = 0; k < curr.size(); k++) { // 行数 =k*r for (int r = 0; r < rep; r++) { arr[k * rep + r][c] = curr.get(k); } } } } } return arr; } /** * 递归解析json * * @param e * @param key */ private void jsonTree(JsonElement e, StringBuilder key) { // 原始数据类型 if (e.isJsonNull() || e.isJsonPrimitive()) { String keyStr = key.toString(); if (map.containsKey(keyStr)) { List<String> list = map.get(keyStr); list.add(e.toString()); map.put(keyStr, list); } else { List<String> list = new ArrayList<String>(); list.add(e.toString()); map.put(keyStr, list); } key.setLength(0); return; } // 对象数组,注:对象数组不一定是OA,反之OA一定是对象数组 if (e.isJsonArray()) { JsonArray jsonArr = e.getAsJsonArray(); if (null != jsonArr) { for (JsonElement je : jsonArr) { if (je.isJsonNull() || je.isJsonPrimitive()) { // 数组中元素依然是原始数据类型 String keyStr = key.toString(); if (map.containsKey(keyStr)) { List<String> list = map.get(keyStr); list.set(0, map.get(keyStr).get(0) + "," + je.toString()); map.put(keyStr, list); } else { List<String> list = new ArrayList<String>(); list.add(je.toString()); map.put(keyStr, list); } } else { // 不是原始数据类型继续递归 jsonTree(je, key); //递归 } } } } // json对象 if (e.isJsonObject()) { // json的所有key=value对 Set<Map.Entry<String, JsonElement>> es = e.getAsJsonObject().entrySet(); String pKey = key.toString(); // 存储父节点 for (Map.Entry<String, JsonElement> en : es) { if (pKey.length() > 0) { //连接符拼接 key = new StringBuilder(pKey + "-->" + en.getKey()); } else { key = new StringBuilder(en.getKey()); } JsonElement element = en.getValue(); jsonTree(element, key); //递归 } } } }
multipleOutputs.write(entry.getKey(), null, new Text(resultStr),String.valueOf(Time.now())+"/"+entry.getKey());
设置多个目录多个文件输出,注意String.valueOf(Time.now())+"/"+entry.getKey()
"/"左边的是目录,右边必须与第一项相同entry.getKey(),表示最终保存的文件名。也可以如下写:
1multipleOutputs.write(NullWritable.get(), new Text(resultStr), String.valueOf(Time.now()) + "/" + entry.getKey());
(4)Utils
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41package com.leboop.www.json; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by leboop on 2020/7/18. */ public class Utils { /** * 从文件中读取字段保存的文件(文件可以理解成一张数据库表) */ public static Map<String, List<String>> readSaveFile() { Map<String, List<String>> saveFileMap = new HashMap<String, List<String>>(); try { BufferedReader metaBR = new BufferedReader(new FileReader( new File("G:\idea_workspace\MapReduce\data\meta"))); String line; while ((line = metaBR.readLine()) != null) { String k = line.split(":")[0]; String[] vs = line.split(":")[1].split("\|"); List<String> list = new ArrayList<String>(); for (String v : vs) { list.add(v); } saveFileMap.put(k, list); } System.out.println("保存文件名:"); for (Map.Entry<String, List<String>> entry : saveFileMap.entrySet()) { System.out.println(entry.getKey() + " = " + entry.getValue()); } System.out.println("+++++++++++++++++++++++++++++++"); } catch (Exception e1) { } return saveFileMap; } }
(5)Main
1
2
3
4
5
6
7
8
9
10
11
12package com.leboop.www.json; import org.apache.hadoop.fs.Path; /** * Created by leboop on 2020/7/18. */ public class Main { public static void main(String[] args) throws Exception { Path path = new Path("/json/data.json"); JsonParser.parse(path); } }
(6)data.json
1{"OA":[{"rd":1,"rn":"s"},{"rd":2,"rn":"d"}],"OOA":[{"a":1,"b":[{"c":1,"d":[{"e":1},{"e":2}]},{"c":2,"d":[{"e":2},{"e":2}]}]},{"a":2,"b":[{"c":1,"d":[{"e":1},{"e":1}]},{"c":2,"d":[{"e":2},{"e":2}]}]}],"name":{"c":"RickandMorty","d":"dd"},"hobby":["t","m",{"s":true}],"id":"kladu8yak8asdha8","boolean":true,"number":3,"k":null,"ARRAY":[{"FIRST":"Rick","SEC":"tt"},{"LAST":"Morty"}]}
(7)meta
1
2
3
4file1:name-->c|name-->d|hobby|hobby-->s|id|boolean|number|k|ARRAY-->FIRST|ARRAY-->SEC|ARRAY-->LAST file2:id|OA-->rd|OA-->rn file3:id|OOA-->a file4:OOA-->a|OOA-->b-->c|OOA-->b-->d-->e
最后
以上就是美丽老鼠最近收集整理的关于gson+mapreduce解析Json数据保存到多个文件中(Json含多层嵌套数组)的全部内容,更多相关gson+mapreduce解析Json数据保存到多个文件中(Json含多层嵌套数组)内容请搜索靠谱客的其他文章。
发表评论 取消回复