Java版:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56package com.bjsxt.sparksql.dataframe; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; /** * 读取json格式的RDD创建DF * @author root * */ public class CreateDFFromJsonRDD { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("jsonRDD"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList( "{'name':'zhangsan','age':"18"}", "{'name':"lisi","age":"19"}", "{"name":"wangwu","age":"20"}" )); JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList( "{"name":"zhangsan","score":"100"}", "{"name":"lisi","score":"200"}", "{"name":"wangwu","score":"300"}" )); DataFrame namedf = sqlContext.read().json(nameRDD); namedf.show(); DataFrame scoredf = sqlContext.read().json(scoreRDD); scoredf.show(); //SELECT t1.name,t1.age,t2.score from t1, t2 where t1.name = t2.name //daframe原生api使用 // namedf.join(scoredf, namedf.col("name").$eq$eq$eq(scoredf.col("name"))) // .select(namedf.col("name"),namedf.col("age"),scoredf.col("score")).show(); //注册成临时表使用 namedf.registerTempTable("name"); scoredf.registerTempTable("score"); /** * 如果自己写的sql查询得到的DataFrame结果中的列会按照 查询的字段顺序返回 */ DataFrame result = sqlContext.sql("select name.name,name.age,score.score " + "from name join score " + "on name.name = score.name"); result.show(); sc.stop(); } }
Scala版:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22val conf = new SparkConf() conf.setMaster("local").setAppName("jsonrdd") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val nameRDD = sc.makeRDD(Array( "{"name":"zhangsan","age":18}", "{"name":"lisi","age":19}", "{"name":"wangwu","age":20}" )) val scoreRDD = sc.makeRDD(Array( "{"name":"zhangsan","score":100}", "{"name":"lisi","score":200}", "{"name":"wangwu","score":300}" )) val nameDF = sqlContext.read.json(nameRDD) val scoreDF = sqlContext.read.json(scoreRDD) nameDF.registerTempTable("name") scoreDF.registerTempTable("score") val result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name") result.show() sc.stop()
最后
以上就是安详汉堡最近收集整理的关于SparkSQL创建RDD:<2>通过创建json格式的RDD创建DataFrame【Java,Scala纯代码】的全部内容,更多相关SparkSQL创建RDD:<2>通过创建json格式内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复