1.持久化
Spark持久化过程包括persist、cache、upersist3个操作
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist() /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. * @return This RDD. */ def unpersist(blocking: Boolean = true): this.type = { logInfo("Removing RDD " + id + " from persistence list") sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this }
cache方法等价于StorageLevel.MEMORY_ONLY的persist方法,而persist方法也仅仅是简单修改了当前RDD的存储级别而已,SparkContext中维护了一张哈希表persistRdds,用于登记所有被持久化的RDD,执行persist操作是,会将RDD的编号作为键,把RDD记录到persistRdds表中,unpersist函数会调用SparkContext对象的unpersistRDD方法,除了将RDD从哈希表persistRdds中移除之外,该方法还会将该RDD中的分区对于的所有块从存储介质中删除。
如下给出持久化的类型
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false)
复制代码
1
2
3
4
5
6
7class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable
2.检查点
检查点机制的实现和持久化的实现有着较大的区别。检查点并非第一次计算就将结果进行存储,而是等到一个作业结束后启动专门的一个作业完成存储的操作。
checkPoint操作的实现在RDD类中,
checkPoint方法会实例化ReliableRDDCheckpointData用于标记当前的RDD
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14/** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint(): Unit = RDDCheckpointData.synchronized { if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } }
RDDCheckpointData类内部有一个枚举类型
CheckpointState
复制代码
用于表示RDD检查点的当前状态,其值有Initialized 、CheckpointingInProgress、 checkpointed。其转换过程如下
1
2
3
4
5
6
7
8
9/** * Enumeration to manage state transitions of an RDD through checkpointing * [ Initialized --> checkpointing in progress --> checkpointed ]. */ private[spark] object CheckpointState extends Enumeration { type CheckpointState = Value val Initialized, CheckpointingInProgress, Checkpointed = Value }
(1)Initialized状态
该状态是实例化ReliableRDDCheckpointData后的默认状态,用于标记当前的RDD已经建立了检查点(较v1.4.x少一个MarkForCheckPiont状态)
(2)CheckpointingInProgress状态
每个作业结束后都会对作业的末RDD调用其doCheckPoint方法,该方法会顺着RDD的关系依赖链往前遍历,直到遇见内部RDDCheckpointData对象被标记为Initialized的为止,此时将RDD的RDDCheckpointData对象标记为CheckpointingInProgress,并启动一个作业完成数据的写入操作。
(3)Checkpointed状态
新启动作业完成数据写入操作之后,将建立检查点的RDD的所有依赖全部清除,将RDD内部的RDDCheckpointData对象标记为Checkpointed,将父RDD重新设置为一个CheckPointRDD对象,
父RDD的compute方法会直接从系统中读取数据。
如上只简单地介绍了相关概念,详细介绍请参看:
https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md
最后
以上就是年轻小蝴蝶最近收集整理的关于Spark基础随笔:持久化&检查点的全部内容,更多相关Spark基础随笔内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复