我是靠谱客的博主 害羞往事,这篇文章主要介绍Hadoop mysql读写操作,现在分享给大家,希望可以做个参考。

使用DBWritable完成同mysql交互

复制代码
1
2
3
4
5
6
7
8
9
10
11
create database big4 ; use big4 ; create table words(id int primary key auto_increment , name varchar(20) , txt varchar(255)); insert into words(name,txt) values('tomas','hello world tom'); insert into words(name,txt) values('tomas1','hello tom world'); insert into words(name,txt) values('tomas2','world hello tom'); insert into words(name,txt) values('tomas3','world tom hello'); create table stats(word varchar(50),c int);

编写hadoop MyDBWritable.

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.mao.hdfs.mysql; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class MyDBWritable implements DBWritable, Writable { private int id; private String name; private String txt; private String word; private int c; public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getC() { return c; } public void setC(int c) { this.c = c; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getTxt() { return txt; } public void setTxt(String txt) { this.txt = txt; } /** * 写入DB * @param ppst * @throws SQLException */ public void write(PreparedStatement ppst) throws SQLException { //ppst.setInt(1,id); // ppst.setString(2,name); // ppst.setString(3,txt); ppst.setString(1,word); ppst.setInt(2,c); } /** * 读取db * @param rs * @throws SQLException */ public void readFields(ResultSet rs) throws SQLException { id = rs.getInt(1); name = rs.getString(2); txt = rs.getString(3); } /** * 写入数据 * @param out * @throws IOException */ public void write(DataOutput out) throws IOException { out.writeInt(id); out.writeUTF(name); out.writeUTF(txt); out.writeUTF(word); out.write(c); } /** * 读取数据 * @param in * @throws IOException */ public void readFields(DataInput in) throws IOException { id = in.readInt(); name = in.readUTF(); txt = in.readUTF(); word = in.readUTF(); c = in.readInt(); } }

mapper类:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.mao.hdfs.mysql; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WCMapper extends Mapper<LongWritable,MyDBWritable,Text,IntWritable> { protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException, IOException { System.out.println(key); String line = value.getTxt(); System.out.println(value.getId() + "," + value.getName()); String[] arr = line.split(" "); for (String s : arr) { context.write(new Text(s), new IntWritable(1)); } } }

reduce类:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.mao.hdfs.mysql; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WCMapper extends Mapper<LongWritable,MyDBWritable,Text,IntWritable> { protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException, IOException { System.out.println(key); String line = value.getTxt(); System.out.println(value.getId() + "," + value.getName()); String[] arr = line.split(" "); for (String s : arr) { context.write(new Text(s), new IntWritable(1)); } } }

APP类:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.mao.hdfs.mysql; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.Job; public class MysqlApp { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); //设置job属性 job.setJobName("MysqlApp"); //作业名称 job.setJarByClass(MysqlApp.class); //搜索类 //配置数据库信息 String driverClass = "com.mysql.jdbc.Driver"; String url = "jdbc:mysql://192.168.0.21:3306/big1"; String username = "bgm"; String password = "12311"; //设置数据库配置 DBConfiguration.configureDB(job.getConfiguration(),driverClass,url,username,password); //设置数据输入内容 DBInputFormat.setInput(job,MyDBWritable.class,"select * from words","select count(*) from words"); DBOutputFormat.setOutput(job,"stats","word","c"); //将结果写入数据库 //FileOutputFormat.setOutputPath(job,new Path("d:/mr/sql/out")); //输出路径 job.setMapperClass(WCMapper.class); //mapper类 job.setReducerClass(WCReducer.class); //reduce类 job.setNumReduceTasks(3); //reduce个数 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); } }

 

最后

以上就是害羞往事最近收集整理的关于Hadoop mysql读写操作的全部内容,更多相关Hadoop内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(79)

评论列表共有 0 条评论

立即
投稿
返回
顶部