我是靠谱客的博主 花痴火,这篇文章主要介绍Hadoop编程——第五章:(5)MapReduce自定义对象序列化案例案例1:各州累计病例数量统计一、需求分析二、代码实现,现在分享给大家,希望可以做个参考。

美国新冠疫情COVID-19病例数统计

有一份2020-12-01号美国各县county的新冠疫情统计数据,包括累计确诊病例、累计死亡病例。使用MapReduce对疫情数据进行各种分析统计。
案例背后的核心是学会自定义MapReduce各个组件。包括自定义对象、序列化、排序、分区、分组。

数据字段说明

date (日期) , county(县) , state(州 ) , fips(县编码code ) , cases(累计确诊病例) , deaths(索计死亡病例)。

案例学习目标:

MapReduce自定义对象序列化
MapReduce自定义排序
MapReduce自定义分区
MapReduce自定义分组
MapReduce自定义分组扩展:topN

案例1:各州累计病例数量统计

统计美国2020-12-01,每个州state累计确诊案例数、累计死亡案例数。

一、需求分析

“州”作为key,CovidCountBean将两个案例数(cases,deaths)封装起来

  • 对于涉及多属性数据传递,建议使用javaBean进行封装传递
  • 注意在MapReduce中需要实现序列化机制
  • 如果是作为key传递还需要实现Compareable接口

在reduce阶段key相同的就会被分成一组,一组调用一次reduce方法处理。
在本业务中key相同即是同一个州的数据
把各个县的病例累计求和就是该州的疫情数据。

注意要使用toString方法。

二、代码实现

(一)pom文件

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>USA_Covid</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version> 2.9.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version> 2.9.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version> 2.9.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib</classpathPrefix> <mainClass>CovidSumDriver</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>

(二)log4j.properties

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
log4j.rootLogger=INFO,STDOUT,R #把日志信息打印到控制台上 log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender log4j.appender.STDOUT.Threshold=DEBUG log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout log4j.appender.STDOUT.layout.ConversionPattern=[%p] [%l] %10.10c - %m%n #放在工程路径下,通过文件查看日志 log4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=mapreduce_test.log log4j.appender.R.MaxFileSize=1MB log4j.appender.R.MaxBackupIndex=1 log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n log4j.logger.org.example=DEBUG

(三)CovidCountBean封装两个属性

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 自定义对象作为数据类型在MR中传递 * 一定要实现Hadoop的序列化机制:接口Writable, ctrl+i 实现两个方法 */ public class CovidCountBean implements Writable { //1、封装私有的属性 private long cases; //确诊病例数 private long deaths; //死亡病例数 //2、有参无参构造 public CovidCountBean() { } public CovidCountBean(long cases, long deaths) { this.cases = cases; this.deaths = deaths; } /** *对有参构造进行修改,提供一个set方法 * 自己封装对象的set方法,用于对象属性赋值 */ public void set(long cases, long deaths) { this.cases = cases; this.deaths = deaths; } //3、set和get方法 public long getCases() { return cases; } public void setCases(long cases) { this.cases = cases; } public long getDeaths() { return deaths; } public void setDeaths(long deaths) { this.deaths = deaths; } //4、实现对象的方法 // @Override // public String toString() { // return "CovidCountBean{" + // "cases=" + cases + // ", deaths=" + deaths + // '}'; // } //修改一下,返回的都是数据 @Override public String toString() { return cases + "t" + deaths; } /** * 序列化方法,可以控制把哪写字段序列化出去 */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(cases); dataOutput.writeLong(deaths); } /** * 反序列化方法 * todo 注意反序列化的顺序和序列化顺序一致 */ @Override public void readFields(DataInput dataInput) throws IOException { this.cases = dataInput.readLong(); this.deaths = dataInput.readLong(); } }

(四)CovidSumMapper

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class CovidSumMapper extends Mapper<LongWritable, Text, Text,CovidCountBean> { /** * 3、创建输出对象 */ Text outKey = new Text(); CovidCountBean outValue = new CovidCountBean(); /** * 1、重写map父类方法:map回车 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 2、读取一行数据进行切割 */ String[] fields = value.toString().split(","); // String line = value.toString(); // String[] fields = line.split("|"); /** * 4、提取数据 州、确诊数、死亡数 */ outKey.set(fields[2]);//下标从0开始 // outValue.set(Long.parseLong(fields[4]),Long.parseLong(fields[5]));//但outValue没有set方法,所以要去改造一下 //因为疫情数据中,会有部分数据缺少,正着数就会越界,可以采用倒着数 outValue.set(Long.parseLong(fields[fields.length-2].trim()),Long.parseLong(fields[fields.length-1].trim())); /** * 5、输出结果 */ context.write(outKey,outValue); } }

(五)CovidSumReducer

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CovidSumReducer extends Reducer<Text,CovidCountBean,Text,CovidCountBean> { CovidCountBean outValue = new CovidCountBean(); @Override protected void reduce(Text key, Iterable<CovidCountBean> values, Context context) throws IOException, InterruptedException { /** * 创建统计变量 */ long totalCases = 0; long totalDeaths = 0; /** * 遍历该州的各个县的数据 */ for(CovidCountBean value : values){ totalCases += value.getCases(); totalDeaths = totalDeaths +value.getDeaths(); } /** * 输出结果赋值 */ outValue.set(totalCases,totalDeaths); context.write(key,outValue); } }

(六)CovidSumDriver

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 该类就是MapReduce程序客户端驱动类 * 主要是构造Job对象实例 * 指定各种组件属性:mapper、reducer类,输入输出的数据类型,输入输出的数据路径,提交job作业(job.submit()) */ public class CovidSumDriver { public static void main(String[] args) throws Exception { //创建驱动类 Configuration conf = new Configuration(); //本地模式运行可以直接指定路径,或者配置Configurations // args = new String[] {"F:/MyBigData/Hadoop/data/mr_wordcount/input ","F:/MyBigData/Hadoop/data/mr_wordcount/output"}; //设置mapreduce程序的运行模式 // conf.set("mapreduce.framework.name","local"); //构造job作业的实例,参数(配置对象,job名字) Job job = Job.getInstance(conf, CovidSumDriver.class.getSimpleName()); //设置mr程序运行的主类 job.setJarByClass(CovidSumDriver.class); //设置本次mr程序的mapper类型、reducer类型 job.setMapperClass(CovidSumMapper.class); job.setReducerClass(CovidSumReducer.class); //指定mapper阶段输出的key value数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CovidCountBean.class); //指定reducer阶段输出的key value数据类型,也是mr程序最终的输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(CovidCountBean.class); //配置本次作业的输入数据路径和输出数据路径 Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); //todo 默认组件 TextInputFormat TextOutputFormat FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job,outputPath); //todo 判断输出路径是否已经存在,如果已经存在,先删除 FileSystem fs = FileSystem.get(conf); if(fs.exists(outputPath)){ fs.delete(outputPath,true); //递归删除 } //最终提交本次job作业 //job.submit(); //采用waitForCompletion提交job,参数表示是否开启实时监视追踪作业的执行情况 boolean resultFlag = job.waitForCompletion(true); //退出程序 和job结果进行绑定, 0是正常退出,1是异常退出 System.exit(resultFlag ? 0: 1); } }

最后

以上就是花痴火最近收集整理的关于Hadoop编程——第五章:(5)MapReduce自定义对象序列化案例案例1:各州累计病例数量统计一、需求分析二、代码实现的全部内容,更多相关Hadoop编程——第五章内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(91)

评论列表共有 0 条评论

立即
投稿
返回
顶部