apache-spark-on-k8s/spark

about the "RetrieveSparkAppConfig" msg

pierric opened this issue · 4 comments

I often find the executor pod reports that the connection to driver is not active and will shutdown in 120s. And looking into the driver pod's log, I find that driver is waiting for "RegisterExecutor" msg from the executor. That's sound very odd, driver and executor are waiting for each other!

I traced and find something does not sound right in the code, but I am not really sure. Could someone take a look?

  • core\src\main\scala\org\apache\spark\executor\CoarseGrainedExecutorBackend.scala. I think it contains the main loop of the executor. Here is one line:
    val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(executorId))
    It is unconditional executed, asking the driver for a configuration.

  • \resource-managers\kubernetes\core\src\main\scala\org\apache\spark\scheduler\cluster\k8s\KubernetesClusterSchedulerBackend.scala. I think it contains the main loop of the driver pod. Here is how it handles RetrieveSparkAppConfig

 override def receiveAndReply(
      context: RpcCallContext): PartialFunction[Any, Unit] = {
      new PartialFunction[Any, Unit]() {
        override def isDefinedAt(msg: Any): Boolean = {
          msg match {
            case RetrieveSparkAppConfig(_) =>
              shuffleManager.isDefined
            case _ => false
          }
        }

        override def apply(msg: Any): Unit = {
          msg match {
            case RetrieveSparkAppConfig(executorId) if shuffleManager.isDefined =>
              val runningExecutorPod = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
                kubernetesClient
                  .pods()
                  .withName(runningExecutorsToPods(executorId).getMetadata.getName)
                  .get()
              }
              val shuffleSpecificProperties = shuffleManager.get
                  .getShuffleServiceConfigurationForExecutor(runningExecutorPod)
              val reply = SparkAppConfig(
                  sparkProperties ++ shuffleSpecificProperties,
                  SparkEnv.get.securityManager.getIOEncryptionKey())
              context.reply(reply)
          }
        }
      }.orElse(super.receiveAndReply(context))
    }

It seems that the driver pod reply the RetrieveSparkAppConfig only if shuffle service is configured.

I didn't enable the shuffle service. I guess that leads to my strange issue.

Shouldn't a configuration always be constructed and sent back?

I don't recall off the top of my head what the intended behavior was with RetrieveSparkAppConfig.
Can you detail a case where this happens? Maybe post the spark-submit commandline you're using as well?
We have tested runs that last 10+ minutes and haven't seen that issue with the applications.

ok, I will get some information of my k8s and the submit command on Friday. don't have access to them right now.

Reply of RetrieveSparkAppConfig is triggered through orElse(super.receiveAndReply(context)) when shuffleManager.isDefined is false. So a reply is always expected.

hmm, that's right. I missed that orElse part. Now I find the default handling of RetrieveSparkAppConfig . then I will need to debug my issue a little bit more.