目录
- 官方HBase-MapReduce
- 自定义HBase-MapReduce1
- 自定义HBase-MapReduce2
- idea上运行(不用打包)基于自定义HBase-MapReduce2
官方HBase-MapReduce
1.查看HBase的MapReduce任务的执行
1
2bin/HBase mapredcp
2.环境变量的导入
让Hadoop加载Hbase的jar包,最简单的就是把HBase的jar包复制到Hadoop的lib里面,或者把HBase的包地址写到Hadoop的环境变量里面。
(1)执行环境变量的导入(临时生效,在命令行执行下述操作)
1
2
3
4$ export HBASE_HOME=/export/servers/HBase-1.3.1 $ export HADOOP_HOME=/export/servers/hadoop-2.7.7 $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
(2)永久生效:在/etc/profile配置
1
2
3export HBASE_HOME=/export/servers/HBase-1.3.1 export HADOOP_HOME=/export/servers/hadoop-2.7.7
并在hadoop-env.sh中配置:(注意:在for循环之后配)
还要分发
1
2export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/export/servers/HBase/lib/*
3.运行官方的MapReduce任务
1
2/export/servers/hadoop-2.7.7/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
案例一:统计Student表中有多少行数据
1
2/export/servers/hadoop-2.7.7/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
案例二:使用MapReduce将本地数据导入到HBase
1)在本地创建一个tsv格式的文件:fruit.tsv
1
2
3
41001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow
2)创建HBase表
1
2HBase(main):001:0> create 'fruit','info'
3)在HDFS中上传fruit.tsv文件
1
2hadoop fs -put fruit.tsv /
4)执行MapReduce到HBase的fruit表中
1
2
3
4/export/servers/hadoop-2.7.7/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit hdfs://hadoop102:9000/fruit.tsv
5)使用scan命令查看导入后的结果
1
2HBase(main):001:0> scan ‘fruit’
自定义HBase-MapReduce1
目标:将HDFS文件,通过MR迁入到fruit1表中。
1
2hbase(main):019:0> create 'fruit1','info'
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FruitMapper extends Mapper<LongWritable, Text,LongWritable,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key,value); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> { @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //遍历values 1001 apple Red for (Text value : values) { //获取每一行数据 String[] fields = value.toString().split("t"); //构造put对象 Put put = new Put(Bytes.toBytes(fields[0])); //给put对象赋值 put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1])); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2])); context.write(NullWritable.get(),put); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FruitDriver implements Tool { //定义Configuration private Configuration configuration = null; public int run(String[] args) throws Exception { //1.获取job对象 Job job = Job.getInstance(configuration); //设置类路径 job.setJarByClass(FruitDriver.class); //3.设置Mapper&Mapper输出的KV类型 job.setMapperClass(FruitMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //4.设置Reducer类 TableMapReduceUtil.initTableReducerJob(args[1], FruitReducer.class, job ); //5.设置输入参数 FileInputFormat.setInputPaths(job,new Path(args[0])); //6.提交任务 boolean result = job.waitForCompletion(true); return result? 0:1; } public void setConf(Configuration config) { configuration = config; } public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration conf = HBaseConfiguration.create(); int run = ToolRunner.run(conf, new FruitDriver(),args ); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
在本地运行一下
不要慌 这是正常的 只要code等于0就没有报错
打包到群集
1
2hadoop jar hbase1602-1.0-SNAPSHOT.jar com.mr1.FruitDriver /fruit.tsv fruit2
自定义HBase-MapReduce2
将hbase中fruit1中的部分数据迁移到fruit2表中
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //构建Put对象 Put put = new Put(key.get()); //1.获取数据 for (Cell cell : value.rawCells()) { //2.判断当前的cell是否为“name”列 if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //3.给Put对象赋值 put.add(cell); } } //4.写出 context.write(key,put); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //遍历写出 for (Put put : values) { context.write(NullWritable.get(),put); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Fruit2Driver implements Tool { //定义配置信息 private Configuration configuration = null; public int run(String[] args) throws Exception { //1.获取job对象 Job job = Job.getInstance(configuration); //2.设置主类路径 job.setJarByClass(Fruit2Driver.class); //3.设置mapper输出kv类型 TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job); //4.设置reducer输出的表 TableMapReduceUtil.initTableReducerJob(args[1], Fruit2Reducer.class, job); //5.提交任务 boolean result = job.waitForCompletion(true); return result?0:1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new Fruit2Driver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
打包到群集
1
2hadoop jar hbase1602-1.0-SNAPSHOT.jar com.mr2.Fruit2Driver fruit fruit2
idea上运行(不用打包)基于自定义HBase-MapReduce2
将hbase中conf下的hbase-site移动到idea中resource下
Fruit2Driver代码改变一下
main方法下改成 Configuration configuration = HBaseConfiguration.create();
把args[0] args[1] 改成表名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Fruit2Driver implements Tool { //定义配置信息 private Configuration configuration = null; public int run(String[] args) throws Exception { //1.获取job对象 Job job = Job.getInstance(configuration); //2.设置主类路径 job.setJarByClass(Fruit2Driver.class); //3.设置mapper输出kv类型 TableMapReduceUtil.initTableMapperJob("fruit1", new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job); //4.设置reducer输出的表 TableMapReduceUtil.initTableReducerJob("fruit2", Fruit2Reducer.class, job); //5.提交任务 boolean result = job.waitForCompletion(true); return result?0:1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration configuration = HBaseConfiguration.create(); int run = ToolRunner.run(configuration, new Fruit2Driver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
最后
以上就是大意河马最近收集整理的关于HBase-MapReduce的全部内容,更多相关HBase-MapReduce内容请搜索靠谱客的其他文章。
发表评论 取消回复