我是靠谱客的博主 迷人大树,这篇文章主要介绍Hadoop实战之课后题--分析web服务器的日志文件,现在分享给大家,希望可以做个参考。

所有代码:github-wttttt

任务

  1. 统计每个IP地址的访问次数
  2. 查找访问数最多的前K个IP地址

分析:

  1. 任务1很简单,简单的求和问题,用来重新熟悉hadoop MR程序的写法。
    • 优化:使用combiner()减少网络中的流量传输;
    • 这个例子中combiner和reducer的逻辑相同,两种使用同一个reduce即可。
    • 代码贴在附录里了,注释详细,可查看~
  2. 任务2是一个TopK的问题,要点有以下几个:
    • 使用TreeMap来得到TopK,有点类似大根堆;
    • 每个mapper得到该mapper的TopK;
    • mapper处理完了相应的input split之后才输出,使用cleanup函数来达到该目的;
    • 仅启动一个reducer以取得全局的TopK。TopK的方法类似mapper。
    • 注:这一段没看明白的宝宝可以看附录3,我引用了别人的一句话,可能讲得比我清晰- -

附录:

  • 任务1的代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; // LogCount: count IP Address's visits // map(): map each line to <IPAddress, 1> // reduce(): add to <IPAddress, n> // use combiner() to optimize public class LogCount{ public static class xxMapper extends Mapper<Object, Text, Text, IntWritable>{ // extends继承类 private final static IntWritable one = new IntWritable(1); // final常量 private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException{ // for each line word.set(value.toString().split(" ")[0]); context.write(word,one); } } public static class xxReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ // extends继承类 private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; // for each key, count it for (IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); // 从输入获取剩下的配置:包括输入和输出路径 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // 输入不合理检测 if (otherArgs.length < 2){ System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "Log Count"); job.setJarByClass(LogCount.class); job.setMapperClass(xxMapper.class); // combiner和reducer使用同一个class,当如果combiner处理逻辑相同时 // 否则,为combiner写一个类,一般xxcombiner也是继承自Reducer job.setCombinerClass(xxReducer.class); // combiner and reducer use the same class job.setReducerClass(xxReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i){ FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
  • 任务2的代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** TopK 问题 Log_Max_k: find the max_k visiter's IP Address map(): get TopK for each mapper * Use TreeMap to store topK for each mapper * For each mapper: for each record, we try to updata the treemap, and finally we get TopK * TreeMap is somewhat like a 'large root heap'. * Unlike usual(write after one line), we write after all the input split is handled. this is realized by the function 'cleanup'(conducted after the mapper task). reduce(): get the global TopK in one Reducer * we need just one Reducer to ensure top-k TopK的k值是从外部(命令行)传给Mapper&Reducer 利用conf.set()以及conf.get() **/ public class Log_Max_k { public static class xxMap extends Mapper<LongWritable, Text, Text, IntWritable>{ /** * the map function * input file: format as: IPAddresstVisitNum (for each line) */ // TODO: <String, Integer> or <Text, Integer> private TreeMap<Integer, Text> tree = new TreeMap<Integer, Text>(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ // TODO: conf.set() in function run() //在map方法中通过Context对象获取conf对象,进而取得参数值 Configuration conf = context. getConfiguration(); int K = conf.getInt("K_value", 10); // default = 10 String[] values = value.toString().split("t"); // Tab split //int visit_num = Integer.parseInt(values[1]); //String IPAddress = values[0]; Text txt = new Text(); txt.set(values[0]); tree.put(Integer.parseInt(values[1]), txt); if (tree.size() > K){ tree.remove(tree.firstKey()); // store the top-k } } @Override protected void cleanup(Context context) throws IOException, InterruptedException{ /** * write after all the input split is handled, by the function cleanup() */ // iterate on the treemap, use Iterator Iterator iter = tree.entrySet().iterator(); while (iter.hasNext()){ @SuppressWarnings("unchecked") Map.Entry<Integer, Text> ent = (Map.Entry<Integer, Text>)iter.next(); // Map.Entry ent = (Map.Entry)iter.next(); // write: IPAddress Visit_num context.write(ent.getValue(), new IntWritable(ent.getKey().intValue())); } } } public static class xxReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private TreeMap<IntWritable, Text> tree = new TreeMap<IntWritable, Text>(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); int K = conf.getInt("K_value", 10); // default = 10 for(IntWritable visit_num: values){ tree.put(visit_num, key); if (tree.size() > K){ tree.remove(tree.firstKey()); } } // iterate on tree, to write top-k Iterator iter = tree.entrySet().iterator(); while (iter.hasNext()){ Map.Entry<IntWritable, Text> ent =(Map.Entry<IntWritable, Text>)iter.next(); context.write(ent.getValue(), ent.getKey()); } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); // 从输入获取剩下的配置:包括输入和输出路径 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // 输入不合理检测 if (otherArgs.length < 3){ System.err.println("Usage: wordcount <K> <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "TopKIP"); job.setJarByClass(Log_Max_k.class); job.setMapperClass(xxMap.class); // job.setCombinerClass(xxReducer.class); // combiner and reducer use the same class job.setReducerClass(xxReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); conf.set("K_value", otherArgs[0]); job.setNumReduceTasks(1); // set the reducer num to 1 for (int i = 1; i < otherArgs.length - 1; ++i){ FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
  • 附录3:关于TopK问题的详细思路:

    1. Mappers
      使用默认的mapper数据,一个input split(输入分片)由一个mapper来处理。
      在每一个map task中,我们找到这个input split的前k个记录。这里我们用TreeMap这个数据结构来保存top K的数据,这样便于更新。下一步,我们来加入新记录到TreeMap中去(这里的TreeMap我感觉就是个大顶堆)。在map中,我们对每一条记录都尝试去更新TreeMap,最后我们得到的就是这个分片中的local top k的k个值。在这里要提醒一下,以往的mapper中,我们都是处理一条数据之后就context.write或者output.collector一次。而在这里不是,这里是把所有这个input split的数据处理完之后再进行写入。所以,我们可以把这个context.write放在cleanup里执行。cleanup就是整个mapper task执行完之后会执行的一个函数。
      2.reducers
      由于我前面讲了很清楚了,这里只有一个reducer,就是对mapper输出的数据进行再一次汇总,选出其中的top k,即可达到我们的目的。Note that we are using NullWritable here. The reason for this is we want all of the outputs from all of the mappers to be grouped into a single key in the reducer.

最后

以上就是迷人大树最近收集整理的关于Hadoop实战之课后题--分析web服务器的日志文件的全部内容,更多相关Hadoop实战之课后题--分析web服务器内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(97)

评论列表共有 0 条评论

立即
投稿
返回
顶部