shilinlee/blog

Spark内核设计的艺术: 第9章 部署模式

shilinlee opened this issue · 0 comments

9.1 心跳接收器 HeartbeatReceiver

HeartbeatReceiver 运行在Driver上,用以接收各个Executor的心跳消息,对各个Executor的"生死”进行监控。

  • 注册Executor
    HeartbeatReceiver继承了SparkListener,并实现了onExecutorAdded方法(见代码清单9.1)。根据3.3节的内容,我们知道事件总线在接收到SparkListenerExecutorAdded消息后,将调用HeartbeatReceiver的onExecutorAdded方法,这样HeartbeatReceiver将监听到Executor的添加。

      /**
       * If the heartbeat receiver is not stopped, notify it of executor registrations.
       */
      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
        addExecutor(executorAdded.executorId)
      }

    addExecutor实现如下:

      /**
       * Send ExecutorRegistered to the event loop to add a new executor. Only for test.
       *
       * @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
       *         indicate if this operation is successful.
       */
      def addExecutor(executorId: String): Option[Future[Boolean]] = {
        Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
      }
  • 移除 Executor

  • TaskSchedulerIsSet 消息

  • 检查超时的Executor

  • Executor的心跳

    image

9.2 Executor 的实现分析

9.2.1 Executor的心跳报告

在初始化Executor的过程中,Executor会调用自己的startDriverHeartbeater方法启动心跳报告的定时任务。

9.2.2 运行Task

Executor 的launchTask方法用于运行Task。

9.3 local 部署模式

本书在讲解各章内容时,主要以local模式为例。local部署模式只有Driver,没有Master和Worker,执行任务的Executor与Driver在同一个JVM进程内。local模式中使用的ExecutorBackend和SchedulerBackend的实现类都是LocalSchedulerBackend。在第7章介绍调度系统时已经分析过local部署模式下的LocalSchedulerBackend、LocalEndpoint等组件的实现,本节将以图形的方式展现local部署模式的启动、local部署模式下的任务提交与执行等流程。

local部署模式的启动过程如图9.2所示。

image

9.4 持久化引擎 PersistentEngine

PersistenceEngine用于当Master发生故障后,通过领导选举选择其他Master接替整个集群的管理工作时,能够使得新激活的Master有能力从故障中恢复整个集群的状态信息,进而恢复对集群资源的管理和分配。抽象类PersistenceEngine定义了对Master必需的任何状态信息进行持久化的接口规范。实现PersistenceEngine必须满足以下机制。

  • 在完成新的Application的注册之前,addApplication方法必须被调用。
  • 在完成新的Worker的注册之前,addWorker方法必须被调用。
  • removeApplication方法和removeWorker方法可以在任何时候调用。

在以上机制的保证下,整个集群的Worker、Driver和Application的信息都被持久化,集群因此可以在领导选举和主Master的切换后,对集群状态进行恢复。

image

BlackHolePersistentEngine都是空实现,CustomPersistentEngine是用于单元测试的实现。其他两个是用于生产环境的实现类。

9.4.1 基于文件系统的持久化引擎

FileSystemPersistenceEngine是基于文件系统的持久化引擎。对于ApplicationInfo、WorkerInfo及DriverInfo,FiIeSystemPersistenceEngine会将它们的数据存储到磁盘上的单个文件夹中,当要移除它们时,这些磁盘文件将被删除。由于不同的Master往往不在同一个机器节点上,因此在使用FileSystemPersistenceEngine时,底层的文件系统应该是分布式的。

9.4.2 基于ZooKeeperPersistenceEngine的久化引擎

ZooKeeperPersistenceEngine是基于ZooKeeper的持久化引擎。对于ApplicationInfo、WorkerInfo及DriverInfo,ZooKeeperPersistenceEngine会将它们的数据存储到ZooKeeper的不同节点(也称为Znode)中,当要移除它们时,这些节点将被删除。

ZooKeeperPersistenceEngine有以下属性。

  • conf: 即SparkConf。
  • serializer:持久化时使用的序列化器。
  • WORKING_DIR:ZooKeeperPersistenceEngine在ZooKeeper上的工作目录,是spark基于ZooKeeper进行热备的根节点(可通过spark.deploy.ZooKeeper.dir属性配置,默认为spark)的子节点master_status。
  • zk:连接ZooKeeper的客户端类型为CuratorFramework。

小贴士Curator是Netflix公司开源的一个ZooKeeper客户端,与ZooKeeper提供的原生客户端相比,Curator的抽象层次更高,其核心目标是帮助工程师管理zooKeeper的相关操作,简化ZooKeeper客户端的开发量。Curator现已提升为Apache的顶级项目。Curator-Framework就是Curator提供的APIO

9.5 领导选举代理

领导选举机制(Leader Election)可以保证集群虽然存在多个Master,但是只有一个Master处于激活(Active)状态,其他的Master处于支持(Standby)状态。当Active状态的Master出现故障时,会选举出一个standby状态的Master作为新的Active状态的Master。由于整个集群的worker,Driver和Application的信息都已经通过持久化引擎持久化,因此切换Master时只会影响新任务的提交,对于正在运行中的任务没有任何影响。

特质LeaderElectionAgent定义了对当前的Master进行跟踪和领导选举代理的通用接口,其定义如下。

/**
 * :: DeveloperApi ::
 *
 * A LeaderElectionAgent tracks current master and is a common interface for all election Agents.
 */
@DeveloperApi
trait LeaderElectionAgent {
  val masterInstance: LeaderElectable
  def stop() {} // to avoid noops in implementations.
}

masterInstance 属性类型是LeaderElectable,特质LeaderElectable的定义如下:

@DeveloperApi
trait LeaderElectable {
  def electedLeader(): Unit   // 被选举为领导
  def revokedLeadership(): Unit  // 撤销领导关系
}
  1. MonarchyLeaderAgent 详解
  2. ZookeeperLeadeElectionrAgent 详解

9.6 Master 详解

Master是local-cluster部署模式和Standalone部署模式中,整个Spark集群最为重要的组件之一,它的设计将直接决定整个集群的可扩展性、可用性和容错性。Master的职责包括worker的管理、Application的管理、Driver的管理等。Master负责对整个集群中所有资源的统一管理和分配,它接收各个worker的注册、更新状态、心跳等消息,也接收Driver和Application的注册。

worker向Master注册时会携带自身的身份和资源信息(如ID、host、P酾、内核数、内存大小等),这些资源将按照一定的资源调度策略分配给或Application。Master给Driver分配了资源后,将向Worker发送启动Driver的命令,后者在接收到启动Driver的命令后启动Driver。Master给Application分配了资源后,将向Worker发送启动Executor的命令,后者在接收到启动Executor的命令后启动Executor。

Master接收Worker的状态更新消息,用于“杀死”不匹配的Driver或Application。worker向Master发送的心跳消息有两个目的:一是告知Master自己还“活着”,另外则是某个出现故障后,通过领导选举选择了其他Master负责对整个集群的管理,此时被激活的Master可能并没有缓存worker的相关信息,因此需要告知Worker重新向新的
Master注册。

本节主要对Master进行详细分析,理解local-cluster部署模式和StandaIone部署模式下Master如何对整个集群的资源进行管理和分配,但在此之前先需要按部就班地了解Master包含的属性。

****** 识别结果 1******

9.6.1 启动Master

启动Master有作为JVM进程内的对象启动和作为单独的进程启动的两种方式。以对象启动的方式主要用于local-cluster模式,而作为进程启动则用于standalone模式。

  1. 对象方式启动
    Master的伴生对象的startRpcEnvAndEndpoint方法用于创建Master对象,并将Master对象注册到RpcEnv中完成对Master对象的启动。

      /**
       * Start the Master and return a three tuple of:
       *   (1) The Master RpcEnv
       *   (2) The web UI bound port
       *   (3) The REST server bound port, if any
       */
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
        val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
      }
    }
    1. 进程方式启动

      Master的伴生对象中实现了main方法,这样就可以作为单独的JVM进程启动了。

        def main(argStrings: Array[String]) {
          Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
            exitOnUncaughtException = false))
          Utils.initDaemon(log)
          val conf = new SparkConf
          val args = new MasterArguments(argStrings, conf)
          val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
          rpcEnv.awaitTermination()
        }

9.6.2 检查worker超时

经过上一小节对启动Master的分析,我们知道定时任务checkForWorkerTimeOutTask是以WORKER_TIMEOUT_MS为时间间隔,通过不断向Master自身发送CheckForWorkerTime0ut消息来实现对worker的超时检查的。Master也继承自RpcEndpoint,Master实现的receive方法中处理CheckForWorkerTimeOut消息的代码如下。

 case CheckForWorkerTimeOut =>
   timeOutDeadWorkers()
  /** Check for, and remove, any timed-out workers */
  private def timeOutDeadWorkers() {
    // Copy the workers into an array so we don't modify the hashset while iterating through it
    val currentTime = System.currentTimeMillis()
    val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray
    for (worker <- toRemove) {
      if (worker.state != WorkerState.DEAD) {
        val workerTimeoutSecs = TimeUnit.MILLISECONDS.toSeconds(workerTimeoutMs)
        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
          worker.id, workerTimeoutSecs))
        removeWorker(worker, s"Not receiving heartbeat for $workerTimeoutSecs seconds")
      } else {
        if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) {
          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
        }
      }
    }
  }

9.6.3 被选举为领导时的处理

Master基于高可用性的考虑,可以同时启动多个Master。这些中只有一个是激活(Active)状态的,其余的都是支持(Standby)状态。根据9.5节的介绍,Master为了具备故障迁移的能力,它实现了LeaderElectable接囗,因此当Master被选举为领导时,领导选举代理(LeaderElectionAgent)将会调用Master的electedLeader方法。electedLeader方法的实现如下所示。

  override def electedLeader() {
    self.send(ElectedLeader)
  }

9.6.4 一级资源调度

9.6.5 注册Worker

在Spark集群中,Master接收到提交的应用程序后,需要根据应用的资源需求,将应用分配到worker上去运行。一个集群刚开始的时候只有Master,为了让后续启动的worker加人到Master的集群中,每个Worker都需要在启动的时候向Master注册,Master接收到worker的注册信息后,将把的各种重要信息(如ID、host、port、内核数、内存大小等信息)缓存起来,以便进行资源的分配与调度。Master为了容灾,还将worker的信息通过持久化引擎进行持久化,以便经过领导选举出的新Mar能够将集群的状态从错误或灾难中恢复。

Master的receiveAndReply方法中实现了对Worker发送的RegisterWorker消息进行处理的实现。

9.6.6 更新Worker的最新状态

Worker在向Master注册成功后,会向Master发送WorkerLatestState消息。WorkerLatestState消息将携带Worker的身份标识、worker节点的所有Executor的描述信息、调度到当前Worker的所有Driver的身份标识。Master接收到WorkerLatestState消息的处理。

9.6.7 处理Worker的心跳

向Master注册Worker,可以让Master知道worker的资源配置,进而通过资源调度使得Driver及Executor可以在Worker上执行。如果Worker的JVM进程发生了崩溃或者Worker所在的机器宕机或网络不通,那么Master所维护的关于worker的注册信息将变得不可用。

为了让Master及时得知Worker的最新状态,需要向Master发送心跳,Master将根据worker的心跳更新Worker的最后心跳时间,以便为整个集群的健康工作提供参考。Master的receive方法中实现了对Worker的心跳的处理。

    case Heartbeat(workerId, worker) =>
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          if (workers.map(_.id).contains(workerId)) {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " Asking it to re-register.")
            worker.send(ReconnectWorker(masterUrl))
          } else {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " This worker was never registered, so ignoring the heartbeat.")
          }
      }

根据代码清单,Master接收到Heartbeat消息后,将从idToWorker中找出缓存的WorkerInfo,并将workerlnfo的最后心跳时间(lastHeartbeat)更新为系统当前时间的时间戳。如果idToWorker中没有缓存的workerlnfo,且workers中有对应的Workerinfo(这说明定时任务checkForWorkerTime0utTask检查到worker超时,但是workerlnfo的状态不是DEAD,那么在调用removeworker方法时将workerlnfo从idToWorker中清除,此时的workers中仍然持有WorkerInfo),那么向Worker发送ReconnectWorker消息。如果idToWorker中没有缓存的WorkerInfo,且workers中也没有对应的WorkerInfo,那么说明checkForWorkerTimeOutTask已经发现Worker很长时间没有心跳,并且WorkerInfo的状态为DEAD后,将WorkerInfo从workers中也移除了。

9.6.8 注册Application

9.6.9 处理Executor的申请

9.6.10 处理Executor的状态变化

9.6.11 Master的常用方法

9.7 Worker详解

worker是Spark在local-cluster部署模式和Standa10ne部署模式中对工作节点的资源和Executor进行管理的服务。Worker一方面向Master汇报自身所管理的资源信息,一方面接收Master的命令运行Drrver或者为Apphcatlon运行Executor。同一个机器上可以同时部署多个Worker服务,一个worker也可以启动多个Executor。当Executor完成后,Worker将回收使用的资源。

9.7.1 启动Worker

启动Worker有作为JVM进程内的对象启动和作为单独的进程启动的两种方式。以对象启动的方式主要用于local-cluster模式,而作为进程启动则用于standalone模式。

9.7.2 向Master注册Worker

worker在启动后,需要加人到Master管理的整个集群中,以参与Dnver、Executor的资源调度。Worker要加入Mar管理的集群,就必须将注册到Mastero在启动Worker的过程中需要调用registerWithMaster方法向Master注册Worker。

9.7.3 向Master发送心跳

为了让Mar得知Worker依然健康运行着,就需要不断地告诉Master:“我活着”,这个过程是通过发送心跳实现的。

根据之前的内容我们知道,当Worker向Mar注册成功后会接收到Master回复的RegisteredWorker消息,Worker使用handleRegisterResponse方法处理RegisteredWorker消息时,将会向forwordMessageScheduler提交以HEARTBEATMILLIS作为间隔向Worker自身发送SendHeaflbeat消息的定时任务。Worker的receive方法实现了对SendHeartbeac消息的处理。

9.7.4 Worker与领导选举

9.7.5 运行 Driver

在介绍 Master对 Driver的资源调度和运行时,我们知道 Master将向 Worker发送LaunchDriver消息以运行 Driver。下面一起来看 Worker接收到 LaunchDriver消息后,是如何运行 Driver的。

9.7.6 运行 Executor

9.7.7 处理Executor的状态变化

9.8 StandloneAppClient 实现

StandloneAppClient 是在standalone模式下,Application与集群管理器进行对话的客户端。

9.8.1 ClientEndpoint 的实现分析

Spark各个组件之间的通信离不开RpcEnv以及RpcEndpoint。ClientEndpoint 继承自ThreadSafeRpcEndpoint,也是StandloneAppClient 的内部类,StandloneAppClient 依赖于ClientEndpoint 与集群管理器进行通信。

9.8.2 StandloneAppClient 的实现分析

StandloneAppClient 最为核心的功能是向集群管理器请求或“杀死”Executor。

9.9 StandloneSchedulerBackend 的实现分析

在7.8.2节我们曾经介绍了loca部署模式下, SchedulerBackend的实现类 LocalSchedulerBackend,本节将介绍在local- cluster模式和 Standalone模式下, SchedulerBackend的另个实现类 StandaloneScheduler Backend。由于 Standalonescheduler Backend继承自 CoarseGrainedSchedulerBackend,本节还需要介绍 CoarseGrainedSchedulerBackend及其与其他组件通信的内部类 DriverEndpoint。