Spark内核设计的艺术: 第6章 存储体系
shilinlee opened this issue · 0 comments
6.1 存储体系概述
简单来讲,Spark存储体系是各个Driver与Executor实例中的BlockManager
所组成的;但是从一个整体来看,把各个节点的BlockManager
看成存储体系的一部分,那存储体系就有了更多衍生的内容,比如块传输服务、map任务输出跟踪器、Shuffle管理器等。
BlockManagerMaster
:代理BlockManager
与Driver上的BlockManagerMasterEndpoint
通信。记号①表示Executor节点上的BlockManager
通过BockManagerMaster
与BlockManagerMasterEndpoint
进行通信,记号②表示Driver节点上的BlockManager
通过BlockManagerMaster
与BlockManagerMasterEndpoint
进行通信。这些通信的内容有很多,例如,注册BlockManager
、更新Block信息、获取Block的位置(即Block所在的BlockManager
)、删除Executor等。BlockManagerMaster
之所以能够和BlockManagerMasterEndpoint
通信,是因为它持有了BlockManagerMasterEndpoint
的RpcEndpointRef
。BlockManagerMasterEndpoint
:由Driver上的SparkEnv负责创建和注册到Driver的RpcEnv中。BlockManagerMasterEndpoint
只存在于Driver的SparkEnv中,Driver或Executor上BlockManagerMaster
的driverEndpoint属性将持有BlockManagerMasterEndpoint
的RpcEndpointRef
。BlockManagerMasterEndpoint
主要对各个节点上的BlockManager
、BlockManager
与Executor的映射关系及Block位置信息(即Block所在的BlockManager
)等进行管理。BlockManagerSlaveEndpoint
:每个Executor或Driver的SparkEnv中都有属于自己的BlockManagerSlaveEndpoint
,分别由各自的SparkEnv
负责创建和注册到各自的RpcEnv中。Driver或Executor都存在各自的BlockManagerSlaveEndpoint
,并由各自BlockManager
的slaveEndpoint
属性持有各自BlockManagerSlaveEndpoint
下发的命令。记号③表示BlockManagerMasterEndpoint
向Driver节点上的BlockManagerSlaveEndpoint
下发命令,记号④表示BlockManagerMasterEndpoint
向Executor节点上的BlockManagerSlaveEndpoint
下发命令。例如,删除Block、获取Block状态、获取匹配的BlockId等。SerializerManager
:序列化管理器MemoryManager
:内存管理器。负责对单个节点上内存的分配与回收MapOutPutTracker
:map任务输出跟踪器。ShuffleManager
:Shuffle管理器BlockTransferService
:块传输服务。此组件也与Shuffle相关联,主要用于不同阶段的任务之间的Block数据的传输与读写。shuffleClinet
:Shuffle的客户端。与BlockTransferService
配合使用。记号⑤表示Executor上的shuffleClient
通过Driver上的BlockTransferService
提供的服务上传和下载Block,记号⑥表示Driver上的shuffleClient
通过Executor上的BlockTransferService
提供的服务上传和下载Block。此外,不同Executor节点上的BlockTransferService
和shuffleClient
之间也可以互相上传、下载Block。SecurityManager
:安全管理器DiskBlockManager
:磁盘块管理器。对磁盘上的文件及目录的读写操作进行管理BlockInfoManager
:块信息管理器。负责对Block的元数据及锁资源进行管理MemoryStore
:内存存储。依赖于MemoryManager
,负责对Block的内存存在DiskStore
:磁盘存储。依赖于DiskBlockManager
,负责对Block的磁盘存储
6.2 Block信息管理器
BlockInfoManager
将主要对Block
的锁资源进行管理。
6.2.1 Block锁的基本概念
BlockInfoManager
是BlockManager
内部子组件之一。它对Block的锁管理,读锁是共享锁,写锁是排它锁。
6.2.2 Block锁的实现
BlockInfoManager
提供的方法实现Block
的锁管理机制。
registerTask
: 注册TaskAttenptId
。currentTaskAttemptId
: 获取上下文TaskContext
中当前值横在执行的任务尝试的TaskAttenptId
。lockForReading
: 锁定读。lockForWriting
: 锁定写。get
: 获取BlockId
对应的BlockInfo
。unlock
: 释放BlockId
对应的Block
上的锁。downgradeLock
: 锁降级。lockNewBlockForWriting
: 写新Block时获得写锁。releaseAllLocksForTask
: 释放给定的任务尝试线程所占用的所有Block的锁,并通知所有等待获取锁的线程。size
: 返回Infos的大小,即所有Block的数量。entries
: 以迭代器形式返回Infos
。removeBlock
: 移除BlockId
对用的·。- clear: 清除
BlockInfoManager
中的所有信息,并通知所有在BlockInfoManager
管理的Block的锁上等待的线程。
6.3 磁盘Block管理器
DiskBlockManager
是存储体系的成员之一。它来创建和控制逻辑上的映射关系(逻辑上的 block 和物理盘上的 locations)。 一个 block 被映射为一个文件 File,这个文件的文件名由指定的 BlockId.name
指定。
6.3.1 本地目录结构
localDirs
是DiskBlockManager
管理的本地目录数组,是通过调用createLocalDirs
方法创建的本地目录数组。
6.3.2 DiskBlockManager 提供的方法
getFile(filename: String)
getFile(blockId: BlockId)
containsBlock(blockId: BlockId)
: 检查本地localDirs
目录中是否包含BlockId
对应的文件。getAllFiles
: 获取本地localDirs
目录中所有文件。getAllBlocks
: 获取本地localDirs
目录中所有Block的BlockId
。createTempLocalBlock
: 为中间结果创建唯一的BlockId
和文件,此文件将用于保存本地Block的数据。createTempShuffleBlock
: 创建唯一的BlockId
和文件,用来存储Shuffle
中间结果(即map任务的输出)。stop
: 正常停止DiskBlockManager
。
6.4 磁盘存储 DiskStore
DiskStore 将负责将Block存储到磁盘。
属性:
conf
diskManager
: DiskBlockManagerminMemoryMapBytes
: 读取磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法读取的阈值。
方法:
getSize(blockId: BlockId)
contaions(blockId: BlockId)
: 判断本地磁盘存储路径下是否包含给定的BlockId
所对应的Block文件。remove(blockId: BlockId)
putBytes(blockId: BlockId, bytes: ChunkedBytesBuffer)
: 将BlockId
所对应的Block写入磁盘,Block的内容已经封装为ChunkedBytesBuffer
。getBytes(blockId: BlockId)
: 读取对应的Block,并封装为ChunkedBytesBuffer
返回。
6.5 内存管理
Spark与Hadoop的重要区别之一就是对于内存的使用。
6.5.1 内存池模型
Spark将内存从逻辑上区分为堆内存和堆外内存,称为内存模式(MemoryModel
)。
@Private
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
}
Spark一共有两种MemoryPool
的实现,分别是StorageMemoryPool
(存储体系用的内存池)和ExecutionMemoryPool
(计算引擎用到的内存池)
6.5.2 StorageMemoryPool详解
StorageMemoryPool
是对用于存储的物理内存的逻辑抽象,通过对存储内存的逻辑管理,提高Spark存储体系对内存的使用效率。
acquireMemory(blockId: BlockId, numBytes: Long): Boolean
用于给定BlockId的Block获取numBytes指定大小的内存。releaseMemory(size: Long): Unit
freeSpaceToShrinkPool(spaceToFree: Long): Long
用于释放指定大小的空间,缩小内存池的大小。
6.5.3 MemoryManager 模型
MemoryManager定义了内存管理的接口规范。
MemoryManager
有两个子类,分别是StaticMemoryManager
和UnifiedMemoryManager
。
StaticMemoryManager
是早期遗留的机制,不存在堆外内存模型,并且存储内存和执行内存的大小均为固定的。UnifiedMemoryManager
从Spark 1.6.0版本开始,作为默认的内存管理器。
6.5.4 UnifiedMemoryManager
UnifiedMemoryManager将计算内存和存储内存之间的边界修改为“软”边界,即任何一方可以向另一方借用空闲的内存。
maxOnHeapStorageMemory: Long
maxOffHeapStorageMemory: Long
acquireExecutionMemory
: 为存储BlockId
对应的Block,从堆内存或堆外内存获取所需大小的内存。acquireUnrollMemory
: 为展开BlockId
的Block,从堆内存或者堆外内存获取所需大小的内存。
6.6 内存存储 MemoryStore
MemoryStore负责将Block存储到内存。Spark通过将广播数据、RDD、Shuffle数据存储到内存,减少了对磁盘I/O的依赖,提高了程序的读写效率。
6.6.1 MemoryStore 的内存模型
Block在内存中以什么形式存在呢?是将文件直接缓存到内存?Spark将内存中的Block抽象为特质MemoryEntry。
private sealed trait MemoryEntry[T] {
def size: Long
def memoryMode: MemoryMode
def classTag: ClassTag[T]
}
MemoryStore 内存模型:
6.6.1 MemoryStore 提供的方法
便于对Block数据的存储和读取。
getSize(blockId: BlockId): Long
: 用于获取BlockId
对应MemoryEntry
(即Block的内存形式)所占用的大小putBytes[T: ClassTag]
: 将BlockId
对应的Block(已经封装为ChunkedByteBuffer
)写入内存reserveUnrollMemoryForThisTask
: 用于为展开尝试执行任务给定的Block保留指定内存模式上指定大小的内存。releaseUnrollMemoryForThisTask
: 用于释放任务尝试线程占用的内存- putIteratorAsValues: 此方法将
BlockId
对应的Block(已经转换为Iterator)写入内存。有时候放入内存的Block很大,所以一次性将此对象写入内存可能将引发OOM异常。为了避免这种情况的发生,首先需要将Block转换为Iterator,然后渐进式地展开此Iterator,并且周期性地检查是否有足够的展开内存。此方法涉及很多变量。 getBytes
: 从内存中读取BlockId
对应的Block(已经封装为ChunkedByteBuffer
),getBytes
只能获取序列化的Block。getValues
:从内存中读取BlockId
对应的Block(已经封装为Iterator),getValues
只能获取没有序列化的Block。remove(blockId: BlockId): Boolean
: 从内存中移除BlockId
对应的BlockevictBlocksToFreeSpace
: 用于驱逐Block,以便释放一些空间来存储新的Block。contains
: 用于判断本地MemoryStore
中是否包含给定的BlockId
所应对的Block文件。
6.7 块管理器 BlockManager
BlockManager
运行在每个节点上(包括Driver和Executor),提供对本地或远端节点上的内存、磁盘及堆外内存中Block的管理。存储体系从狭义上来说指的就是BlockManager
,从广义上来说,则包括整个Spark集群中的各个 BlockManager
、BlockInfoManager
、DiskBlockManager
、DiskStore
、MemoryManager
、MemoryStore
、对集群中的所有BlockManager
进行管理的BlockManagerMaster
及各个节点上对外提供Block上传与下载服务的BlockTransferService
。
6.7.1 BlockManager的初始化
每个Driver或Executor在创建自身的SparkEnv时都会创建BlockManager,BlockManager只有在其initialize方法被调用后才能发挥作用。
6.7.2 BlockManager提供的方法
reregister(): Unit
: 用于向BlockManagerMaster
重新注册BlockManager
,并向BlockManagerMaster
报告所有的Block信息。getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer]
:用于存储体系获取BlockId所对应Block的数据,并封装为ChunkedByteBuffer
后返回。getBlockData(blockId: BlockId): ManagedBuffer
: 此方法用于获取本地Block的数据。putBytes
:putBytes
实际调用的是doPutBytes
方法putBlockData
: 用于将Block数据写入本地getStatus(blockId: BlockId): Option[BlockStatus]
: 用于获取Block的状态getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId]
: 用于获取匹配过滤器条件的BlockId
的序列, 代码中除了从BlockInfoManager
的entries缓存中获取BlockId
外,还需要从DiskBlockManager
中获取,这是因为DiskBlockManager
中可能存在BlockInfoManager
不知道的Block。getLocalValues(blockId: BlockId): Option[BlockResult]
:用于从本地的BlockManager中获取Block数据。getLocations(blockId: BlockId): Seq[BlockManagerId]
:getRemoteBytes
方法的作用为从远端的BlockManager
以序列化的字节形式获取Block数据。get[T: ClassTag](blockId: BlockId): Option[BlockResult]
:用于优先从本地获取Block数据,当本地获取不到所需的Block数据,再从远端获取Block数据。downgradeLock(blockId: BlockId): Unit
:将当前线程持有的Block的写锁降级为读锁。releaseLock(blockId: BlockId): Unit
:用于当前线程对持有的Block的锁进行释放registerTask(taskAttemptId: Long): Unit
:用于将任务尝试线程注册到BlockInfoManager
。releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId]
: 用于任务尝试线程对持有的所有Block的锁进行释放。getOrElseUpdate
: 用于获取Block。如果Block存在,则获取此Block并返回BlockResult
,否则调用makeIterator
方法计算Block,并持久化后返回BlockResult
或Iterator
。putIterator
:此方法用于将Block数据写入存储体系。getDiskWriter
:用于创建并获取DiskBlockObjectWriter
,通过DiskBlockObjectWriter
可以跳过对DiskStore
的使用,直接将数据写入磁盘。dropFromMemory
:用于从内存中删除Block,当Block的存储级别允许写入磁盘,Block将被写入磁盘。此方法主要在内存不足,需要从内存腾出空闲空间时使用。removeRdd(rddId: Int): Int
:移除属于指定RDD
的所有Block。removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int
:移除属于指定Broadcast
的所有Block。
6.8 BlockManagerMaster对BlockManager的管理
BlockManagerMaster
的作用是对存在于Executor或Driver上的BlockManager
进行统一管理。Executor与Driver关于BlockManager
的交互都依赖于BlockManagerMaster
,比如Executor需要向Driver发送注册BlockManager
、更新Executor上Block的最新信息、询问所需要Block目前所在的位置及当Executor运行结束需要将此Executor移除等。但是Driver与Executor却位于不同机器中,该怎么实现呢?
在Spark执行环境一节中有介绍过,Driver上的BlockManagerMaster
会实例化并且注册BlockManagerMasterEndpoint
。无论是Driver还是Executor,它们的BlockManagerMaster
的driverEndpoint
属性都将持有BlockManagerMasterEndpoint
的RpcEndpiointRef
。无论是Driver还是Executor,每个BlockManager
都拥有自己的BlockManagerSlaveEndpoint
,且BlockManager
的slaveEndpoint
属性保存着各自BlockManagerSlaveEndpoint
的RpcEndpointRef
。BlockManagerMaster
负责发送消息,BlockManagerMasterEndpoint
负责消息的接收与处理,BlockManagerSlaveEndpoint
则接收BlockManagerMasterEndpoint
下发的命令。
6.8.1 BlockManagerMaster的职责
BlockManagerMaster
负责发送各种与存储体系相关的信息,这些消息的类型如下:
RemoveExecutor
(移除Executor)RegisterBlockManager
(注册BlockManager
)UpdateBlockInfo
(更新Block信息)GetLocations
(获取Block的位置)GetLocationsMultipleBlockIds
(获取多个Block的位置)GetPeers
(获取其它BlockManager
的BlockManagerId
)GetExecutorEndpointRef
(获取Executor的EndpointRef
引用)RemoveBlock
(移除Block)RemoveRdd
(移除Rdd
Block)RemoveShuffle
(移除Shuffle Block)RemoveBroadcast
(移除Broadcast Block)GetMemoryStatus
(获取指定的BlockManager
的内存状态)GetStorageStatus
(获取存储状态)GetMatchingBlockIds
(获取匹配过滤条件的Block)HasCachedBlocks
(指定的Executor上是否有缓存的Block)StopBlockManagerMaster
(停止BlockManagerMaster
)
6.8.2 BlockManagerMasterEndpoint详解
BlockManagerMasterEndpoint
接收Driver或Executor上BlockManagerMaster
发送的消息,对所有的BlockManager
统一管理BlockManager
的属性。
BlockManagerMasterEndpoint
接收的消息类型正好与BlockManagerMaster
所发送的消息一一对应。选取RegisterBlockManager
消息来介绍BlockManagerMasterEndpoint
是如何接收和处理RegisterBlockManager
消息。
6.8.3 BlockManagerSlaveEndpoint详解
BlockManagerSlaveEndpoint
用于接收BlockManagerMasterEndpoint
的命令并执行相应的操作。BlockManagerSlaveEndpoint
也重写了RpcEndpoint
的receiveAndReply
方法。
6.9 Block传输服务
BlockTransferService
是BlockManager
的子组件之一,抽象类BlockTransferService
有个实现类:
- 用于测试的
MockBlockTransferService
NettyBlockTransferService
BlockManager
实际采用了NettyBlockTransferService
提供的Block传输服务。
为什么要把由Netty实现的网络服务组件也放到存储体系里,由于Spark是分布式部署的,每个Task(准确说是任务尝试)最终都运行在不同的机器节点上。map任务的输出结果直接存储到map任务所在机器的存储体系中,reduce任务极有可能不在同一机器上运行,所以需要远程下载map任务的中间输出。NettyBlockTransferService
提供了可以被其它节点的客户端访问的Shuffle服务。
有了Shuffle的服务端,那么也需要相应的Shuffle客户端,以便当前节点将Block上传到其它节点或者从其它节点下载Block到本地。BlockManager
中创建Shuffle客户端的代码如下:
//org.apache.spark.storage.BlockManager
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled())
} else {
blockTransferService
}
6.9.1 初始化NettyBlockTransferService
NettyBlockTransferService
只有在其init
方法被调用 ,即被初始化后才提供服务。根据块管理器BlockManager
可知,BlockManager
在初始化的时候,将调用NettyBlockTransferService
的init
方法。
6.9.2 NettyBlockRpcServer详解
下面重点来看看NettyBlockTransferService
与NettyRpcEnv
的最大区别——使用的RpcHandler
实现类不同,NettyRpcEnv
采用了NettyRpcHandler
,而NettyBlockTransferService
则采用了NettyBlockRpcServer
。
-
OneForOneStreamManager
的实现NettyBlockRpcServer
中使用OneForOneStreamManager
来提供一对一的流服务。OneForOneStreamManager
实现StreamManager
的registerChannel
、getChunk
、connectionTerminated
、checkAuthorization
、registerStream
五个方法。OneForOneStreamManager
将处理ChunkFetchRequest
类型的消息。
6.9.3 Shuffle客户端
如果没有部署外部的Shuffle服务,即spark.shuffle.service.enabled
属性为false时,NettyBlockTransferService
不但通过OneForOneStreamManager
与NettyBlockRpcServer
对外提供Block上传与下载的服务,也将作为默认的Shuffle客户端。NettyBlockTransferService
作为Shuffle客户端,具有发起上传和下载请求并接收服务端响应的能力。NettyBlockTransferService
的两个方法——fetchBlocks
和uploadBlock
将具有此功能。
- 发送下载远端Block的请求
- 同步下载远端Block
- 发送想远端上传Block的请求
- 同步向远端上传Block
6.10 DiskBlockObjectWriter 详解
BlockManager
的getDiskWriter
方法用于创建DiskBlockObjectWriter
。DiskBlockObjectWriter
将在Shuffle阶段将map任务的输出写入磁盘,这样reduce任务就能从磁盘中获取map任务的中间输出了。
DiskBlockObjectWriter
用于将JVM中的对象直接写入磁盘文件中。DiskBlockObjectWriter
允许将数据追加到现有Block。为了提高效率,DiskBlockObjectWriter
保留了跨多个提交的底层文件通道。
open
: 用于打开要写入文件的各种输出流及管道。recordWritten
:用于对写入的记录数进行统计和度量write
: 用于向输出流中写入键值对。commitAdnGet
:用于将输出流中的数据写入到磁盘。