Spark连接Mysql与hive
连接Mysql
导入依赖
复制代码
1
2
3
4
5
6<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.21</version> </dependency>
获取SparkSession对象
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82package com.qf.sql.day03 import java.util.Properties import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * 演示SparkSql如何加载文件 */ object _03TestMysql { def main(args: Array[String]): Unit = { write read } def write: Unit = { val spark = SparkSession.builder() .appName("testload") .master("local[*]") .getOrCreate() import spark.implicits._ /* * 写入mysql中 * jdbc(url: String, table: String, properties: Properties) * * url:用于指定连接路径 * table: 指定表名 * properties:用于指定连接参数的其他属性,比如,用户名,密码等 */ val df: DataFrame = spark.read .csv("sql/country.csv") .toDF("id","country","code") val prop = new Properties() prop.put("user","root") prop.put("password","123456") prop.put("driver","com.mysql.cj.jdbc.Driver") /** * 写出时的模式研究: * SaveMode.Append 追加 ,注意主键的问题,如果有主键,可能会报错 * SaveMode.ErrorIfExists 如果存在就提示报错, 是默认模式 * SaveMode.Ignore 忽略 , 如果已经存在,则不插入 * SaveMode.Overwrite 覆盖, 如果已经存在,则删除,重新创建 */ df.write.mode(SaveMode.Ignore) .jdbc("jdbc:mysql://10.36.140.103:3306/mydb2?serverTimezone=UTC","country",prop) spark.stop() } def read: Unit = { val spark = SparkSession.builder() .appName("testload") .master("local[*]") .getOrCreate() import spark.implicits._ /* * 读取mysql中的表数据 * jdbc(url: String, table: String, properties: Properties) * * url:用于指定连接路径 * table: 指定表名 * properties:用于指定连接参数的其他属性,比如,用户名,密码等 */ val prop = new Properties() prop.put("user","root") prop.put("password","123456") prop.put("driver","com.mysql.cj.jdbc.Driver") val df: DataFrame = spark.read .jdbc("jdbc:mysql://10.36.140.103:3306/mydb2?serverTimezone=UTC", "country", prop) //df.show() println(df.count()) spark.stop() } }
SparkSQL连接hive
- 将hive-site.xml拷贝到spark的conf目录下
- 将core-site.xml和hdfs-site.xml拷贝到spark的conf目录下
- 将mysql的驱动包,拷贝到spark的jars目录下
3.4.3 代码连接hive
1)读取hive中的表
复制代码
1
2
3
4
51. 开启hive支持 2. 将hive-site.xml以及core-site.xml,hdfs-site.xml拷贝到resources目录下 3. 读取数据: spark.read.table("库名.表名")
2)向hive中写数据
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
151. 开启hive支持 2. 将hive-site.xml以及core-site.xml,hdfs-site.xml拷贝到resources目录下 3. 要写的数据,可以来源于内存,也可以来源于外部文件 注意: --如果是来源于外部文件,可能出现路径问题,默认读取的是hdfs上的文件,如果想要读取本地文件,需要添加file:///绝对路径 --也可能出现权限文件,可以使用如下方法: System.setProperty("HADOOP_USER_NAME","root") 4. spark.write.saveAstable("库名.表名") 小贴士: 如果指定模式,只能使用下面几个 SaveMode.Append 默认模式 SaveMode.Overwrite SaveMode.Ignore 依然使追加操作
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20object _04TestHive { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","root") val spark: SparkSession = SparkSession.builder().master("local[*]").appName("testload").enableHiveSupport().getOrCreate() import spark.implicits._ //val df: DataFrame = spark.read.csv("file:///D:\IDEAfile\SparkSql\sql\country.csv").toDF("id", "country", "code") // df.write.saveAsTable("mydb2.country1") val df: DataFrame = spark.read.table("mydb2.country") df.show() spark.stop() } }
最后
以上就是无私星星最近收集整理的关于Spark连接Mysql与Hive的全部内容,更多相关Spark连接Mysql与Hive内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复