我是靠谱客的博主 矮小奇迹,这篇文章主要介绍第二章:《Spark之-----RDD编程》,现在分享给大家,希望可以做个参考。

一、Spark系统

spark相当于是一个优化了Hadoop里面MapReduce的程序,它主要以scala编程为基础。
在这里插入图片描述
在实际应用中,大数据处理主要包括以下几个类型:
 复杂的批量数据处理:时间跨度通常在数十分钟到数小时之间;(主要用Spark Core来实现)
 基于历史数据的交互式查询:时间跨度通常在数十秒到数分钟之间;(主要用Spark SQL来实现)
 基于实时数据流的数据处理:时间跨度通常在数百毫秒到数秒之间。(主要用Spark Streaming和Structured Streaming来实现)
 基于历史数据的挖掘:(主要用MLlib来实现)
 图结构的处理:(主要用GraphX来实现)



二、是spark的简易执行流程(以standalone-client模式为例)

在这里插入图片描述
1、加载主类,通过反射调用指定类的main方法;

2、创建SparkContext,与Master进行通信申请资源;

3、Master与Worker通信,启动executor;

4、Worker中启动executor,并创建线程池;

5、executor向Driver反向注册;

6、创建原始RDD;

val lines = sc.textFile(“hdfs://linux02:9000/data”)

7、调用RDD中的Transformation(s)算子;

val result = lines.filter(!.startsWith(“java”)).map(.toUpperCase())

8、调用RDD的Action算子saveAsTextFile(Action底层调用sc.runJob);

result.saveAsTextFile(“hdfs://linux02:9000/output”)

9、构建DAG,根据shuffle切分stage,生成taskSet;

task是类的实例,这个类是根据调用的RDD的方法传入的函数生成的(有属性:记录读取那个切片的数据;有方法:如何对数据进行运算)

10、TastScheduler将task序列化后,通过网络发送给executor;

11、executor接收task后,进行反序列化,然后用实现runnable接口的包装类包装这一层,最后提交到线程池中;

读数据是以迭代器的形式边读边计算的。

三、RDD编程基础

一、从文件系统中加载数据创建RDD
1.读取本地文件:

复制代码
1
2
3
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:27

"file:///"表示读取本地的文件,而不是读取HDFS里的文件
2.读取HDFS里面的文件
把刚才在本地文件系统中的“/usr/local/spark/mycode/rdd/word.txt”上传到HDFS文件系统的hadoop用户目录下就可以使用下面任意一条命令完成从HDFS文件系统中加载数据:

复制代码
1
2
scala> val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")

此时用的是"file://"
在这里插入图片描述



二、通过并行集合(数组)创建RDD
从数组中创建:

复制代码
1
2
3
4
5
6
scala>val array = Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala>val rdd = sc.parallelize(array) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29

或者,也可以从列表中创建:

复制代码
1
2
3
4
5
6
scala>val list = List(1,2,3,4,5) list: List[Int] = List(1, 2, 3, 4, 5) scala>val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29


三、RDD操作
RDD被创建好以后,在后续使用过程中一般会发生两种操作:

  •  转换(Transformation): 基于现有的数据集创建一个新的数据集。
  •  行动(Action):在数据集上进行运算,返回计算值。
    RDD操作中的“转换”是会把每个行为记录下来但并不会真的对它进行计算,只有RDD“行动”才会将那些操作进行批量运算
    1.转换操作:
    对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
  • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
  • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
  • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
  • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
  • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
复制代码
1
2
3
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") scala> lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)

reduce()操作每次接收两个参数,取出较大者留下,然后再继续比较直到留出最大值。

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

2.行动操作
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
下面列出一些常见的行动操作(Action API):

  • count() 返回数据集中的元素个数
  • collect() 以数组的形式返回数据集中的所有元素
  • first() 返回数据集中的第一个元素
  • take(n) 以数组的形式返回数据集中的前n个元素
  • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
  • foreach(func) 将数据集中的每个元素传递到函数func中运行*
    在这里插入图片描述
    在这里插入图片描述
    3.惰性机制
复制代码
1
2
3
4
scala> val lines = sc.textFile("data.txt") scala> val lineLengths = lines.map(s => s.length) scala> val totalLength = lineLengths.reduce((a, b) => a + b)

上面第一行首先从外部文件data.txt中构建得到一个RDD,名称为lines,但是,由于textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把data.txt文件加载到内存中

第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。

第三行代码的reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。这时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的map和reduce,最后把结果返回给Driver Program。



四、持久化

惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

复制代码
1
2
3
4
5
6
7
8
9
scala> val list = List("Hadoop","Spark","Hive") list: List[String] = List(Hadoop, Spark, Hive) scala> val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29 scala> println(rdd.count()) //行动操作,触发一次真正从头到尾的计算 3 scala> println(rdd.collect().mkString(",")) //行动操作,触发一次真正从头到尾的计算 Hadoop,Spark,Hive

上面代码执行过程中,前后共触发了两次从头到尾的计算。

可以使用persist()方法对一个RDD标记为持久化,等到遇到第一个行动操作触发真正计算以后,会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。

persist()的圆括号中包含的是持久化级别参数,比如,

  • persist(MEMORY_ONLY)如果使用这条语句,内存不足时,就要按照LRU原则替换缓存中的内容。
  • persist(MEMORY_AND_DISK)如果内存不足,超出的分区将会被存放在硬盘上。
  • RDD.cache()=RDD.persist(MEMORY_ONLY)

例子如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
scala> val list = List("Hadoop","Spark","Hive") list: List[String] = List(Hadoop, Spark, Hive) scala> val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29 scala> rdd.cache() //会调用persist(MEMORY_ONLY) scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中 3 scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd Hadoop,Spark,Hive

最后,可以使用unpersist()方法手动地把持久化的RDD从缓存中移除。



五、分区
分区就是为了使得数据存储的更加的规范,方便管理。
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。
RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。
*本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
*Apache Mesos:默认的分区数为8;
*Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;

复制代码
1
2
3
4
5
scala>val array = Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala>val rdd = sc.parallelize(array,2) #设置两个分区 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29

现在我们可以人为的进行重分区:
在这里插入图片描述



词频统计实例
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述



六、打印元素
一般会采用语句rdd.foreach(println)或者rdd.map(println)。
当采用集群模式执行时rdd.collect().foreach(println)来打印,或者rdd.take(100).foreach(println)这种抓取前100条打印。



三、键值对RDD

.一、键值对RDD的创建
①第一种创建方式:从文件中加载

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt") lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/pairrdd/word.txt MapPartitionsRDD[1] at textFile at <console>:27 scala> val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1)) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:29 scala> pairRDD.foreach(println) (i,1) (love,1) (hadoop,1) (i,1) (love,1) (Spark,1) (Spark,1) (is,1) (fast,1) (than,1) (hadoop,1)

②通过并行集合来创建键值对RDD

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> val list = List("Hadoop","Spark","Hive","Spark") list: List[String] = List(Hadoop, Spark, Hive, Spark) scala> val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:29 scala> val pairRDD = rdd.map(word => (word,1)) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:31 scala> pairRDD.foreach(println) (Hadoop,1) (Spark,1) (Hive,1) (Spark,1)

二、常用的键值对转换操作
常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等
1.reduceByKey(func)

复制代码
1
2
3
4
5
6
//有四个键值对(“spark”,1)、(“spark”,1)、(“hadoop”,1)和(“hadoop”,1),对具有相同key的键值对进行合并后的结果就是:(“spark”,2)、(Hive,1)、(“hadoop”,1) scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println) (Spark,2) (Hive,1) (Hadoop,1)

2.groupByKey()

复制代码
1
2
3
4
5
6
7
8
9
//四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5)) scala> pairRDD.groupByKey() res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKey at <console>:34 //从上面执行结果信息中可以看出,分组后,value被保存到Iterable[Int]中 scala> pairRDD.groupByKey().foreach(println) (Spark,CompactBuffer(1, 1)) (Hive,CompactBuffer(1)) (Hadoop,CompactBuffer(1))

3.keys
会把键值对RDD中的key返回形成一个新的RDD。对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{“spark”,”spark”,”hadoop”,”hadoop”}。

复制代码
1
2
3
4
5
6
7
8
scala> pairRDD.keys res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at <console>:34 scala> pairRDD.keys.foreach(println) Hadoop Spark Hive Spark

4.values
会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{1,2,3,5}。

复制代码
1
2
3
4
5
6
7
8
9
scala> pairRDD.values res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at <console>:34 scala> pairRDD.values.foreach(println) 1 1 1 1

5.sortByKey()

复制代码
1
2
3
4
5
6
7
8
9
scala> pairRDD.sortByKey() res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at <console>:34 scala> pairRDD.sortByKey().foreach(println) (Hadoop,1) (Hive,1) (Spark,1) (Spark,1) //按字母升序

6.sortBy()

复制代码
1
2
3
4
scala>val d1=sc.parllelize(Array(('c',8),('c',17),('a',42),('b',4),('d',9),('e',17),('f',29),('g',21),('b',9))) scala>d1.reduceByKey(_+_).sortByKey(false).collect res:Array[(String,Int)]=Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))
复制代码
1
2
3
4
5
scala>val d1=sc.parllelize(Array(('c',8),('c',17),('a',42),('b',4),('d',9),('e',17),('f',29),('g',21),('b',9))) scala>d1.reduceByKey(_+_).sortBy(_._2,false).collect res:Array[(String,Int)]=Array((a,42),(b,38),(f,29),(c,27),(g,21),(e,17),(d,9)) //根据value进行降序排列

7.mapValues(func)
它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。

复制代码
1
2
3
4
5
6
7
8
9
//比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。 scala> pairRDD.mapValues(x => x+1) res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:34 scala> pairRDD.mapValues(x => x+1).foreach(println) (Hadoop,2) (Spark,2) (Hive,2) (Spark,2)

8.join
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
pairRDD1是一个键值对集合{(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)},pairRDD2是一个键值对集合{(“spark”,”fast”)},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{(“spark”,1,”fast”),(“spark”,2,”fast”)}。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5))) pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:27 scala> val pairRDD2 = sc.parallelize(Array(("spark","fast"))) pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[25] at parallelize at <console>:27 scala> pairRDD1.join(pairRDD2) res9: org.apache.spark.rdd.RDD[(String, (Int, String))] = MapPartitionsRDD[28] at join at <console>:32 scala> pairRDD1.join(pairRDD2).foreach(println) (spark,(1,fast)) (spark,(2,fast))

9.一个综合实例
题目:给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

复制代码
1
2
3
4
5
6
scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:27 scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect() res22: Array[(String, Int)] = Array((spark,4), (hadoop,5))

在这里插入图片描述



四、文件系统的数据读写

1.本地文件系统的数据读写

复制代码
1
2
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

上面代码中,val后面的是变量textFile,而sc.textFile()中的这个textFile是sc的一个方法名称,这个方法用来加载文件数据。
注意,要加载本地文件,必须采用“file:///”开头的这种格式。执行上上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。

复制代码
1
2
scala> textFile.first()

first()是一个“行动”(Action)类型的操作,会启动真正的计算过程

正因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,spark-shell也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来,比如:

复制代码
1
2
val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")

上面我们使用了一个根本就不存在的word123.txt,执行上面语句时,spark-shell根本不会报错,因为,没有遇到“行动”类型的first()操作之前,这个加载操作时不会真正执行的。

复制代码
1
2
scala> textFile.first()

执行上面语句后,你会发现,会返回错误信息,其中有四个醒目的中文文字“拒绝连接”,因为,这个word123.txt文件根本就不存在。

练习一下如何把textFile变量中的内容再次写回到另外一个文本文件wordback.txt中:

复制代码
1
2
3
4
5
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt") scala> textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback") //注意,写的时候是指定一个目录,不是一个具体文件,你可以写成textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback.txt")但生成的依旧是一个目录 //写入的这个目录下会包含一些文件,这些文件就是被分区后的数据,如果我们读取一个目录时会读取这个目录下所有的文件(也就是这个目录下所有的数据)

我们如果想再次把数据加载在RDD中

复制代码
1
2
3
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/writeback.txt") //我们知道weiteback是一个目录,读取一个目录时会读取这个目录下所有的文件(也就是这个目录下所有的数据)

2.分布式文件系统HDFS的数据读写
你注册一个用户名时就会有一个“/user/用户名”的目录,假如使用用户名hadoop登录Linux系统就会有“/user/hadoop”目录。
再次强调,这个目录是在HDFS文件系统中,不在本地文件系统中。

我们在Hadoop的hdfs中创建了一个word.txt文件
编写语句从HDFS中加载word.txt文件,并显示第一行文本内容:

复制代码
1
2
3
4
scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") scala> textFile.first() //执行上面语句后,就可以看到HDFS文件系统中(不是本地文件系统)的word.txt的第一行内容了。

下面,我们再把textFile的内容写回到HDFS文件系统中(写到hadoop用户目录下):

复制代码
1
2
3
4
scala> val textFile = sc.textFile("word.txt") scala> textFile.saveAsTextFile("writeback.txt") //执行上面命令后,文本内容会被写入到HDFS文件系统的“/user/hadoop/writeback”目录下,该目录下会被生成很多分区的文件存储着各种数据,我们需要再次把writeback.txt中的内容加载到RDD中时,只需要加载这个目录就会加载该目录下所有的文件

当需要再次把writeback.txt中的内容加载到RDD中时:

复制代码
1
2
3
scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/writeback.txt") //如果我们给textFile()函数传递的不是文件名,而是一个目录,则该目录下的所有文件内容都会被读取到RDD中。

3.json文件的读取
在这里插入图片描述

复制代码
1
2
3
4
5
6
scala> val jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json") scala> jsonStr.foreach(println) {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}

从上面执行结果可以看出,people.json文件加载到RDD中以后,在RDD中存在三个字符串。我们下面要做的事情,就是把这三个JSON格式的字符串解析出来,比如说,第一个字符串{“name”:”Michael”},经过解析后,解析得到key是”name”,value是”Michael”。现在我们编写程序完成对上面字符串的解析工作。

Scala中有一个自带的JSON库——scala.util.parsing.json.JSON,可以实现对JSON数据的解析。这个库提供了JSON.parseFull(jsonString:String)函数,以一个JSON字符串作为输入并进行解析,如果解析成功则返回一个Some(map: Map[String, Any]),如果解析失败则返回None。

所以,编写如下代码,来解析json文件,转换为rdd

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON object JSONApp { def main(args: Array[String]) { val inputFile = "file:///usr/local/spark/examples/src/main/resources/people.json" val conf = new SparkConf().setAppName("JSONApp") //设置了一个SparkConf对象 val sc = new SparkContext(conf) //生成了SparkContext对象 val jsonStrs = sc.textFile(inputFile) //把文件加载进来生成了一个RDD,RDD里面的每个元素就是那个json文件的一行 val result = jsonStrs.map(s => JSON.parseFull(s)) //对RDD里的每个元素进行解析,如果解析成功则返回一个Some(map: Map[String, Any]),如果解析失败则返回None。 result.foreach( {r => r match { case Some(map: Map[String, Any]) => println(map) case None => println("Parsing failed") case other => println("Unknown data structure: " + other) } } ) } } 输出的结果: Map(name -> Michael) Map(name -> Andy, age -> 30.0) Map(name -> Justin, age -> 19.0)

4.读取hbase数据
在这里插入图片描述
在这里插入图片描述
①.启动hbase,创建hbase表
在这里插入图片描述

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
hbase> create 'student','info' //首先录入student表的第一个学生记录 hbase> put 'student','1','info:name','Xueqian' //先锁定行键“1”,再锁定列族“name”,往单元格里面插入值 hbase> put 'student','1','info:gender','F' hbase> put 'student','1','info:age','23' //然后录入student表的第二个学生记录 hbase> put 'student','2','info:name','Weiliang' hbase> put 'student','2','info:gender','M' hbase> put 'student','2','info:age','24' //如果每次只查看一行,就用下面命令 hbase> get 'student','1' //如果每次查看全部数据,就用下面命令 hbase> scan 'student'

在这里插入图片描述
②编写程序读取HBase数据
如果要让Spark读取HBase,就需要使用SparkContext提供的newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SparkOperateHBase { def main(args: Array[String]) { val conf = HBaseConfiguration.create() val sc = new SparkContext(new SparkConf()) //设置查询的表名 conf.set(TableInputFormat.INPUT_TABLE, "student") val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) val count = stuRDD.count() println("Students RDD Count:" + count) stuRDD.cache() //遍历输出 stuRDD.foreach({ case (_,result) => val key = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes)) val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes)) val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age) }) } } 结果: Students RDD Count:2 Row key:1 Name:Xueqian Gender:F Age:23 Row key:2 Name:Weiliang Gender:M Age:24

③编写程序向HBase写入数据

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.spark._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes object SparkWriteHBase { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local") val sc = new SparkContext(sparkConf) val tablename = "student" sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename) val job = new Job(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //构建两行记录 val rdd = indataRDD.map(_.split(',')).map{arr=>{ val put = new Put(Bytes.toBytes(arr(0))) //行健的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) //info:name列的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2))) //info:gender列的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt)) //info:age列的值 (new ImmutableBytesWritable, put) }} rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) } }
复制代码
1
2
hbase> scan 'student'

在这里插入图片描述



五、RDD的与原理


在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

最后

以上就是矮小奇迹最近收集整理的关于第二章:《Spark之-----RDD编程》的全部内容,更多相关第二章内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(70)

评论列表共有 0 条评论

立即
投稿
返回
顶部