我是靠谱客的博主 包容乌龟,这篇文章主要介绍Join中数据倾斜问题解决,现在分享给大家,希望可以做个参考。

Join中数据倾斜问题解决

问题描述:

就是在一个reducer任务中累加的数量过大,而在另一个reducer任务累加的数量较少,这样就造成了数据倾斜

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-F2hcOloM-1632316508589)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215036327.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GEvmJbm6-1632316508591)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215625320.png)]

如上图,一个产品对应多个订单,但是假设iPhoneX卖得非常的好,而iPhone8P销量寥寥,那么我们在使用 mapreduce做数据分析的时候,我们的某个reducetask就会压力山大,而某些reducetask就很闲。 (这是把两个表放在一起进行处理)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Uroipon6-1632316508592)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215734214.png)]

解决方案:

1、我们之所以产生数据倾斜,是因为我们使用reducetask这一个阶段来拼接pid相同的product和order,所以我们在reducetask才会产生数据倾斜

2、如果我们在maptask就能将product和order都join起 来,那么不需要reducetask就不会产生倾斜了

3、所以我们如果可以在map阶段就获取到产品的全表,那么读取到order表 就能够直接进行join了

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Um2dtTT-1632316508593)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918221055288.png)]

分析:

遇到的问题

1、我们如何将product放在maptask?

2、我们并不确定maptask在哪

3、我们存放文件应该存放在maptask的job目录下,但是job目录是maptask启动之后才创建的。

4、把product拷贝到maptask下是一个难题

解决方案:

使用分布式缓存(Distributed Cache)在存储数据,然后maptask都从分布式缓存中读取,这样子就没有maptask不知道在哪里的问题以及redis的问题了。(也就是把两个表分开处理)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cbCdPvzI-1632316508594)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918224052864.png)]

复制代码
1
2
3
通过查看源码了解到,map最终是运行在run方法中的,而run方法其实是线程运行的方法,再继续观察发现,在调用map方法之前会调用一次setup方法     

*****简单的说处理这个问题就是:

(这里的主要目的就是把两张表提前进行连接,这样久不会产生数据倾斜问题)

1、建立一个集合存放商品信息表,(当然了在创建之前需要创建对象)

然后把相应的信息加入到集合中,如果没有值久设为空

2、在通过map进行

再次把信息加入到集合中,然后在输出,不用reduce阶段的聚合

这里主要是对集合的使用

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class BeanInfo implements WritableComparable<BeanInfo> { //正常是实现Writable接口的,但是Writable是用作value的 //WritableComparable是把这个对象看做key来用 private int orderId;//订单id private String date;//订单时间 private String pid;//商品id private int amount;//购买数量 private String pname;//商品名称 private int category_id;//商品分类id private double price;//商品价格 //创建构造方法 public BeanInfo() { } public void set(int orderId, String date, String pid, int amount, String pname, int category_id, double price) { this.orderId = orderId; this.date = date; this.pid = pid; this.amount = amount; this.pname = pname; this.category_id = category_id; this.price = price; } //创建get、set方法 public int getOrderId() { return orderId; } public void setOrderId(int orderId) { this.orderId = orderId; } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public int getCategory_id() { return category_id; } public void setCategory_id(int category_id) { this.category_id = category_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } //实现这个接口,需要重写compareTo放方法 //在这里的这个主要作用是判断两个表的id号是否相同 //如果相同则返回0 @Override public int compareTo(BeanInfo o) { return this.category_id = o.category_id; } //这个是实现这个接口的序列化 //序列化是将对象的字段信息写入输出流 @Override public void write(DataOutput out) throws IOException { out.writeInt(this.orderId); //如果是字符串,要实现序列化需要使用writeUTF out.writeUTF(this.date); out.writeUTF(this.pid); out.writeInt(this.amount); out.writeUTF(this.pname); out.writeInt(this.category_id); out.writeDouble(this.price); } //这个实现这个接口的反序列化 //从输入流中读取各字段的信息 @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readInt(); this.date = in.readUTF(); this.pid = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.category_id = in.readInt(); this.price = in.readDouble(); } //实现toString()方法 @Override public String toString() { return orderId +" "+ date +" "+ pid +" "+ amount +" "+ pname +" "+ category_id +" "+ price; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class JoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> { //创建一个集合,把订单表信息先行加入到集合中,如果没有该值则设为空 List<BeanInfo> beanInfoList = new ArrayList<>(); private Text k = new Text(); //读取product表中的数据 @Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //1、控制读取数据 //这里是把数据放到idea中,通过反射进行读取 BufferedReader br = new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream("product.txt"))); //2、遍历这个文件中的数据 String line = null; while ((line = br.readLine())!= null){ //实例化BeanInfo对象 BeanInfo beanInfo = new BeanInfo(); String[] words = line.split(","); //对对象进行赋值 beanInfo.set(0,"",words[0],0,words[1],Integer.parseInt(words[2]),Double.parseDouble(words[3])); beanInfoList.add(beanInfo); } //3、释放 br.close(); } //读取order信息,读取订单表信息 @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(","); int orderId = Integer.parseInt(words[0]); String date = words[1]; String pid = words[2]; int amount = Integer.parseInt(words[3]); //迭代product集合,用于比较集合中pid和order中的pid Iterator<BeanInfo> iterator = beanInfoList.iterator(); while (iterator.hasNext()){ BeanInfo next = iterator.next(); //获取pid,通过pid可以把数据加到相应的集合中数据里 String pid1 = next.getPid(); if (pid1.equals(pid)){ next.setOrderId(orderId); next.setDate(date); next.setAmount(amount); k.set(next.toString()); context.write(k,NullWritable.get()); } } } }

最后

以上就是包容乌龟最近收集整理的关于Join中数据倾斜问题解决的全部内容,更多相关Join中数据倾斜问题解决内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(105)

评论列表共有 0 条评论

立即
投稿
返回
顶部