Partitioner--分区
主要作用就是将map的结果发送到相应的reduce。
Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理。如果需要定制partitioner也需要继承该类。HashPartitioner是mapreduce的默认partitioner。计算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。HashPartitioner是处理Mapper任务输出的,getPartition()方法有三个形参,源码中key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。那么任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0。也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。所以如果想要最终输出到多个文件中,在Mapper任务中对数据应该划分到多个区中。那么,我们只需要按照一定的规则让getPartition(…)方法的返回值是0,1,2,3…即可。
一般,我们都会使用默认的分区函数,但有时我们又有一些,特殊的需求,而需要定制Partition来完成我们的业务,案例如下:
对手机号划分为移动、联通、电信分三个文件保存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class DataCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(DataCount.class); job.setMapperClass(DataCountMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(DataCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setNumReduceTasks(Integer.parseInt(args[2])); job.setPartitionerClass(DataCountPartitioner.class); job.waitForCompletion(true); } public static class DataCountPartitioner extends Partitioner<Text, DataBean> { private static Map<String, Integer> map = new HashMap<String, Integer>(); static { map.put("139", 1); map.put("153", 2); map.put("182", 3); } /** * arg2是partitioner的数量,启动几个reduce就产生几个partitioner */ @Override public int getPartition(Text key, DataBean bean, int arg2) { String account = key.toString(); String tel_sub = account.substring(0, 3); Integer count = map.get(tel_sub); if (count == null) { count = 0; } return count; } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class SortStep { public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{ private InfoBean k = new InfoBean(); @Override protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("t"); k.set(fields[0], Double.parseDouble(fields[1]), Double.parseDouble(fields[2])); context.write(k, NullWritable.get()); } } public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{ private Text k = new Text(); @Override protected void reduce(InfoBean key, Iterable<NullWritable> values,Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context) throws IOException, InterruptedException { k.set(key.getAccount()); context.write(k, key); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class InfoBean implements WritableComparable<InfoBean>{ private String account; private double income; private double expenses; private double surplus; @Override public void write(DataOutput out) throws IOException { out.writeUTF(account); out.writeDouble(income); out.writeDouble(expenses); out.writeDouble(surplus); } @Override public void readFields(DataInput in) throws IOException { this.account = in.readUTF(); this.income = in.readDouble(); this.expenses = in.readDouble(); this.surplus = in.readDouble(); } @Override public int compareTo(InfoBean o) { if(this.income == o.getIncome()){ return this.expenses > o.getExpenses() ? 1 : -1; } return this.income > o.getIncome() ? 1 : -1; } @Override public String toString() { return income + "t" + expenses + "t" + surplus; } }
combiner
数据格式转换:
map: (K1, V1) → list(K2,V2)
combine: (K2, list(V2)) → list(K3, V3)
reduce: (K3, list(V3)) → list(K4, V4)
最后
以上就是甜甜学姐最近收集整理的关于3.MapReduce高级接口编程(partitioner、sort、combiner)Partitioner--分区combiner的全部内容,更多相关3内容请搜索靠谱客的其他文章。
发表评论 取消回复