shilinlee/blog

Spark内核设计的艺术: 第6章 存储体系

shilinlee opened this issue · 0 comments

6.1 存储体系概述

简单来讲,Spark存储体系是各个Driver与Executor实例中的BlockManager所组成的;但是从一个整体来看,把各个节点的BlockManager看成存储体系的一部分,那存储体系就有了更多衍生的内容,比如块传输服务、map任务输出跟踪器、Shuffle管理器等。

image

  • BlockManagerMaster:代理BlockManager与Driver上的BlockManagerMasterEndpoint通信。记号①表示Executor节点上的BlockManager通过BockManagerMasterBlockManagerMasterEndpoint进行通信,记号②表示Driver节点上的BlockManager通过BlockManagerMasterBlockManagerMasterEndpoint进行通信。这些通信的内容有很多,例如,注册BlockManager、更新Block信息、获取Block的位置(即Block所在的BlockManager)、删除Executor等。BlockManagerMaster之所以能够和BlockManagerMasterEndpoint通信,是因为它持有了BlockManagerMasterEndpointRpcEndpointRef
  • BlockManagerMasterEndpoint:由Driver上的SparkEnv负责创建和注册到Driver的RpcEnv中。BlockManagerMasterEndpoint只存在于Driver的SparkEnv中,Driver或Executor上BlockManagerMaster的driverEndpoint属性将持有BlockManagerMasterEndpointRpcEndpointRefBlockManagerMasterEndpoint主要对各个节点上的BlockManagerBlockManager与Executor的映射关系及Block位置信息(即Block所在的BlockManager)等进行管理。
  • BlockManagerSlaveEndpoint:每个Executor或Driver的SparkEnv中都有属于自己的BlockManagerSlaveEndpoint,分别由各自的SparkEnv负责创建和注册到各自的RpcEnv中。Driver或Executor都存在各自的BlockManagerSlaveEndpoint,并由各自BlockManagerslaveEndpoint属性持有各自BlockManagerSlaveEndpoint下发的命令。记号③表示BlockManagerMasterEndpoint向Driver节点上的BlockManagerSlaveEndpoint下发命令,记号④表示BlockManagerMasterEndpoint向Executor节点上的BlockManagerSlaveEndpoint下发命令。例如,删除Block、获取Block状态、获取匹配的BlockId等。
  • SerializerManager:序列化管理器
  • MemoryManager:内存管理器。负责对单个节点上内存的分配与回收
  • MapOutPutTracker:map任务输出跟踪器。
  • ShuffleManager:Shuffle管理器
  • BlockTransferService:块传输服务。此组件也与Shuffle相关联,主要用于不同阶段的任务之间的Block数据的传输与读写。
  • shuffleClinet:Shuffle的客户端。与BlockTransferService配合使用。记号⑤表示Executor上的shuffleClient通过Driver上的BlockTransferService提供的服务上传和下载Block,记号⑥表示Driver上的shuffleClient通过Executor上的BlockTransferService提供的服务上传和下载Block。此外,不同Executor节点上的BlockTransferServiceshuffleClient之间也可以互相上传、下载Block。
  • SecurityManager:安全管理器
  • DiskBlockManager:磁盘块管理器。对磁盘上的文件及目录的读写操作进行管理
  • BlockInfoManager:块信息管理器。负责对Block的元数据及锁资源进行管理
  • MemoryStore:内存存储。依赖于MemoryManager,负责对Block的内存存在
  • DiskStore:磁盘存储。依赖于DiskBlockManager,负责对Block的磁盘存储

6.2 Block信息管理器

BlockInfoManager将主要对Block的锁资源进行管理。

6.2.1 Block锁的基本概念

BlockInfoManagerBlockManager内部子组件之一。它对Block的锁管理,读锁是共享锁,写锁是排它锁。

image

6.2.2 Block锁的实现

BlockInfoManager提供的方法实现Block的锁管理机制。

  • registerTask: 注册TaskAttenptId
  • currentTaskAttemptId: 获取上下文TaskContext中当前值横在执行的任务尝试的TaskAttenptId
  • lockForReading: 锁定读。
  • lockForWriting: 锁定写。
  • get: 获取BlockId对应的BlockInfo
  • unlock: 释放BlockId对应的Block上的锁。
  • downgradeLock: 锁降级。
  • lockNewBlockForWriting: 写新Block时获得写锁。
  • releaseAllLocksForTask: 释放给定的任务尝试线程所占用的所有Block的锁,并通知所有等待获取锁的线程。
  • size: 返回Infos的大小,即所有Block的数量。
  • entries: 以迭代器形式返回Infos
  • removeBlock: 移除BlockId对用的·。
  • clear: 清除BlockInfoManager中的所有信息,并通知所有在BlockInfoManager管理的Block的锁上等待的线程。

6.3 磁盘Block管理器

DiskBlockManager是存储体系的成员之一。它来创建和控制逻辑上的映射关系(逻辑上的 block 和物理盘上的 locations)。 一个 block 被映射为一个文件 File,这个文件的文件名由指定的 BlockId.name 指定。

6.3.1 本地目录结构

localDirsDiskBlockManager管理的本地目录数组,是通过调用createLocalDirs方法创建的本地目录数组。

image

6.3.2 DiskBlockManager 提供的方法

  • getFile(filename: String)
  • getFile(blockId: BlockId)
  • containsBlock(blockId: BlockId): 检查本地localDirs目录中是否包含BlockId对应的文件。
  • getAllFiles: 获取本地localDirs目录中所有文件。
  • getAllBlocks: 获取本地localDirs目录中所有Block的BlockId
  • createTempLocalBlock: 为中间结果创建唯一的BlockId和文件,此文件将用于保存本地Block的数据。
  • createTempShuffleBlock: 创建唯一的BlockId和文件,用来存储Shuffle中间结果(即map任务的输出)。
  • stop: 正常停止DiskBlockManager

6.4 磁盘存储 DiskStore

DiskStore 将负责将Block存储到磁盘。

属性:

  • conf
  • diskManager: DiskBlockManager
  • minMemoryMapBytes: 读取磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法读取的阈值。

方法:

  • getSize(blockId: BlockId)
  • contaions(blockId: BlockId): 判断本地磁盘存储路径下是否包含给定的BlockId所对应的Block文件。
  • remove(blockId: BlockId)
  • putBytes(blockId: BlockId, bytes: ChunkedBytesBuffer): 将BlockId所对应的Block写入磁盘,Block的内容已经封装为ChunkedBytesBuffer
  • getBytes(blockId: BlockId): 读取对应的Block,并封装为ChunkedBytesBuffer返回。

6.5 内存管理

Spark与Hadoop的重要区别之一就是对于内存的使用。

6.5.1 内存池模型

Spark将内存从逻辑上区分为堆内存堆外内存,称为内存模式(MemoryModel)。

@Private
public enum MemoryMode {
  ON_HEAP,
  OFF_HEAP
}

Spark一共有两种MemoryPool的实现,分别是StorageMemoryPool(存储体系用的内存池)和ExecutionMemoryPool(计算引擎用到的内存池)

6.5.2 StorageMemoryPool详解

StorageMemoryPool是对用于存储的物理内存的逻辑抽象,通过对存储内存的逻辑管理,提高Spark存储体系对内存的使用效率。

  • acquireMemory(blockId: BlockId, numBytes: Long): Boolean 用于给定BlockId的Block获取numBytes指定大小的内存。
  • releaseMemory(size: Long): Unit
  • freeSpaceToShrinkPool(spaceToFree: Long): Long 用于释放指定大小的空间,缩小内存池的大小。

6.5.3 MemoryManager 模型

MemoryManager定义了内存管理的接口规范。

MemoryManager有两个子类,分别是StaticMemoryManagerUnifiedMemoryManager

StaticMemoryManager是早期遗留的机制,不存在堆外内存模型,并且存储内存和执行内存的大小均为固定的。UnifiedMemoryManager从Spark 1.6.0版本开始,作为默认的内存管理器。

6.5.4 UnifiedMemoryManager

UnifiedMemoryManager将计算内存和存储内存之间的边界修改为“软”边界,即任何一方可以向另一方借用空闲的内存。

  • maxOnHeapStorageMemory: Long
  • maxOffHeapStorageMemory: Long
  • acquireExecutionMemory : 为存储BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。
  • acquireUnrollMemory: 为展开BlockId的Block,从堆内存或者堆外内存获取所需大小的内存。

6.6 内存存储 MemoryStore

MemoryStore负责将Block存储到内存。Spark通过将广播数据、RDD、Shuffle数据存储到内存,减少了对磁盘I/O的依赖,提高了程序的读写效率。

6.6.1 MemoryStore 的内存模型

Block在内存中以什么形式存在呢?是将文件直接缓存到内存?Spark将内存中的Block抽象为特质MemoryEntry。

private sealed trait MemoryEntry[T] {
  def size: Long
  def memoryMode: MemoryMode
  def classTag: ClassTag[T]
}

MemoryStore 内存模型:

image

6.6.1 MemoryStore 提供的方法

便于对Block数据的存储和读取。

  • getSize(blockId: BlockId): Long : 用于获取BlockId对应MemoryEntry(即Block的内存形式)所占用的大小
  • putBytes[T: ClassTag]: 将BlockId对应的Block(已经封装为ChunkedByteBuffer)写入内存
  • reserveUnrollMemoryForThisTask: 用于为展开尝试执行任务给定的Block保留指定内存模式上指定大小的内存。
  • releaseUnrollMemoryForThisTask: 用于释放任务尝试线程占用的内存
  • putIteratorAsValues: 此方法将BlockId对应的Block(已经转换为Iterator)写入内存。有时候放入内存的Block很大,所以一次性将此对象写入内存可能将引发OOM异常。为了避免这种情况的发生,首先需要将Block转换为Iterator,然后渐进式地展开此Iterator,并且周期性地检查是否有足够的展开内存。此方法涉及很多变量。
  • getBytes: 从内存中读取BlockId对应的Block(已经封装为ChunkedByteBuffer),getBytes只能获取序列化的Block。
  • getValues:从内存中读取BlockId对应的Block(已经封装为Iterator),getValues只能获取没有序列化的Block。
  • remove(blockId: BlockId): Boolean : 从内存中移除BlockId对应的Block
  • evictBlocksToFreeSpace: 用于驱逐Block,以便释放一些空间来存储新的Block。
  • contains: 用于判断本地MemoryStore中是否包含给定的BlockId所应对的Block文件。

6.7 块管理器 BlockManager

BlockManager运行在每个节点上(包括Driver和Executor),提供对本地或远端节点上的内存、磁盘及堆外内存中Block的管理。存储体系从狭义上来说指的就是BlockManager,从广义上来说,则包括整个Spark集群中的各个 BlockManagerBlockInfoManagerDiskBlockManagerDiskStoreMemoryManagerMemoryStore、对集群中的所有BlockManager进行管理的BlockManagerMaster及各个节点上对外提供Block上传与下载服务的BlockTransferService

6.7.1 BlockManager的初始化

每个Driver或Executor在创建自身的SparkEnv时都会创建BlockManager,BlockManager只有在其initialize方法被调用后才能发挥作用。

6.7.2 BlockManager提供的方法

  • reregister(): Unit: 用于向BlockManagerMaster重新注册BlockManager,并向BlockManagerMaster报告所有的Block信息。
  • getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer]:用于存储体系获取BlockId所对应Block的数据,并封装为ChunkedByteBuffer后返回。
  • getBlockData(blockId: BlockId): ManagedBuffer : 此方法用于获取本地Block的数据。
  • putBytesputBytes实际调用的是doPutBytes方法
  • putBlockData: 用于将Block数据写入本地
  • getStatus(blockId: BlockId): Option[BlockStatus]: 用于获取Block的状态
  • getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId]: 用于获取匹配过滤器条件的BlockId 的序列, 代码中除了从BlockInfoManager的entries缓存中获取BlockId外,还需要从DiskBlockManager中获取,这是因为DiskBlockManager中可能存在BlockInfoManager不知道的Block。
  • getLocalValues(blockId: BlockId): Option[BlockResult]:用于从本地的BlockManager中获取Block数据。
  • getLocations(blockId: BlockId): Seq[BlockManagerId]getRemoteBytes方法的作用为从远端的BlockManager以序列化的字节形式获取Block数据。
  • get[T: ClassTag](blockId: BlockId): Option[BlockResult] :用于优先从本地获取Block数据,当本地获取不到所需的Block数据,再从远端获取Block数据。
  • downgradeLock(blockId: BlockId): Unit:将当前线程持有的Block的写锁降级为读锁。
  • releaseLock(blockId: BlockId): Unit:用于当前线程对持有的Block的锁进行释放
  • registerTask(taskAttemptId: Long): Unit:用于将任务尝试线程注册到BlockInfoManager
  • releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId]: 用于任务尝试线程对持有的所有Block的锁进行释放。
  • getOrElseUpdate: 用于获取Block。如果Block存在,则获取此Block并返回BlockResult,否则调用makeIterator方法计算Block,并持久化后返回BlockResultIterator
  • putIterator:此方法用于将Block数据写入存储体系。
  • getDiskWriter:用于创建并获取DiskBlockObjectWriter,通过DiskBlockObjectWriter可以跳过对DiskStore的使用,直接将数据写入磁盘。
  • dropFromMemory:用于从内存中删除Block,当Block的存储级别允许写入磁盘,Block将被写入磁盘。此方法主要在内存不足,需要从内存腾出空闲空间时使用。
  • removeRdd(rddId: Int): Int :移除属于指定RDD的所有Block。
  • removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int:移除属于指定Broadcast的所有Block。

6.8 BlockManagerMaster对BlockManager的管理

BlockManagerMaster 的作用是对存在于Executor或Driver上的BlockManager进行统一管理。Executor与Driver关于BlockManager的交互都依赖于BlockManagerMaster,比如Executor需要向Driver发送注册BlockManager、更新Executor上Block的最新信息、询问所需要Block目前所在的位置及当Executor运行结束需要将此Executor移除等。但是Driver与Executor却位于不同机器中,该怎么实现呢?

在Spark执行环境一节中有介绍过,Driver上的BlockManagerMaster 会实例化并且注册BlockManagerMasterEndpoint。无论是Driver还是Executor,它们的BlockManagerMasterdriverEndpoint属性都将持有BlockManagerMasterEndpointRpcEndpiointRef。无论是Driver还是Executor,每个BlockManager都拥有自己的BlockManagerSlaveEndpoint,且BlockManagerslaveEndpoint属性保存着各自BlockManagerSlaveEndpointRpcEndpointRefBlockManagerMaster负责发送消息,BlockManagerMasterEndpoint负责消息的接收与处理,BlockManagerSlaveEndpoint则接收BlockManagerMasterEndpoint下发的命令。

6.8.1 BlockManagerMaster的职责

BlockManagerMaster负责发送各种与存储体系相关的信息,这些消息的类型如下:

  • RemoveExecutor(移除Executor)
  • RegisterBlockManager(注册BlockManager
  • UpdateBlockInfo(更新Block信息)
  • GetLocations(获取Block的位置)
  • GetLocationsMultipleBlockIds(获取多个Block的位置)
  • GetPeers(获取其它BlockManagerBlockManagerId
  • GetExecutorEndpointRef(获取Executor的EndpointRef引用)
  • RemoveBlock(移除Block)
  • RemoveRdd(移除Rdd Block)
  • RemoveShuffle(移除Shuffle Block)
  • RemoveBroadcast(移除Broadcast Block)
  • GetMemoryStatus(获取指定的BlockManager的内存状态)
  • GetStorageStatus(获取存储状态)
  • GetMatchingBlockIds(获取匹配过滤条件的Block)
  • HasCachedBlocks(指定的Executor上是否有缓存的Block)
  • StopBlockManagerMaster(停止BlockManagerMaster

6.8.2 BlockManagerMasterEndpoint详解

BlockManagerMasterEndpoint接收Driver或Executor上BlockManagerMaster发送的消息,对所有的BlockManager统一管理BlockManager的属性。

BlockManagerMasterEndpoint接收的消息类型正好与BlockManagerMaster所发送的消息一一对应。选取RegisterBlockManager消息来介绍BlockManagerMasterEndpoint是如何接收和处理RegisterBlockManager消息。

6.8.3 BlockManagerSlaveEndpoint详解

BlockManagerSlaveEndpoint用于接收BlockManagerMasterEndpoint的命令并执行相应的操作。BlockManagerSlaveEndpoint也重写了RpcEndpointreceiveAndReply方法。

6.9 Block传输服务

BlockTransferServiceBlockManager的子组件之一,抽象类BlockTransferService有个实现类:

  • 用于测试的MockBlockTransferService
  • NettyBlockTransferService

BlockManager实际采用了NettyBlockTransferService提供的Block传输服务。

为什么要把由Netty实现的网络服务组件也放到存储体系里,由于Spark是分布式部署的,每个Task(准确说是任务尝试)最终都运行在不同的机器节点上。map任务的输出结果直接存储到map任务所在机器的存储体系中,reduce任务极有可能不在同一机器上运行,所以需要远程下载map任务的中间输出。NettyBlockTransferService提供了可以被其它节点的客户端访问的Shuffle服务。

有了Shuffle的服务端,那么也需要相应的Shuffle客户端,以便当前节点将Block上传到其它节点或者从其它节点下载Block到本地。BlockManager中创建Shuffle客户端的代码如下:

//org.apache.spark.storage.BlockManager
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
  new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
    securityManager.isSaslEncryptionEnabled())
} else {
  blockTransferService
}

6.9.1 初始化NettyBlockTransferService

NettyBlockTransferService只有在其init方法被调用 ,即被初始化后才提供服务。根据块管理器BlockManager可知,BlockManager在初始化的时候,将调用NettyBlockTransferServiceinit方法。

6.9.2 NettyBlockRpcServer详解

下面重点来看看NettyBlockTransferServiceNettyRpcEnv的最大区别——使用的RpcHandler实现类不同,NettyRpcEnv采用了NettyRpcHandler,而NettyBlockTransferService则采用了NettyBlockRpcServer

  • OneForOneStreamManager的实现

    NettyBlockRpcServer中使用OneForOneStreamManager来提供一对一的流服务。OneForOneStreamManager实现StreamManagerregisterChannelgetChunkconnectionTerminatedcheckAuthorizationregisterStream五个方法。OneForOneStreamManager将处理ChunkFetchRequest类型的消息。

  • NettyBlockRpcServer的实现

6.9.3 Shuffle客户端

如果没有部署外部的Shuffle服务,即spark.shuffle.service.enabled属性为false时,NettyBlockTransferService不但通过OneForOneStreamManagerNettyBlockRpcServer对外提供Block上传与下载的服务,也将作为默认的Shuffle客户端。NettyBlockTransferService作为Shuffle客户端,具有发起上传和下载请求并接收服务端响应的能力。NettyBlockTransferService的两个方法——fetchBlocksuploadBlock将具有此功能。

  • 发送下载远端Block的请求
  • 同步下载远端Block
  • 发送想远端上传Block的请求
  • 同步向远端上传Block

6.10 DiskBlockObjectWriter 详解

BlockManagergetDiskWriter方法用于创建DiskBlockObjectWriterDiskBlockObjectWriter 将在Shuffle阶段将map任务的输出写入磁盘,这样reduce任务就能从磁盘中获取map任务的中间输出了。

DiskBlockObjectWriter 用于将JVM中的对象直接写入磁盘文件中。DiskBlockObjectWriter 允许将数据追加到现有Block。为了提高效率,DiskBlockObjectWriter 保留了跨多个提交的底层文件通道。

  • open: 用于打开要写入文件的各种输出流及管道。
  • recordWritten:用于对写入的记录数进行统计和度量
  • write: 用于向输出流中写入键值对。
  • commitAdnGet:用于将输出流中的数据写入到磁盘。