apache-spark-on-k8s/spark

Spark on kubernetes, driver and executor pods crashes abruptly on spark sql jobs.

kaushiksrinivas opened this issue · 0 comments

Below is the scenario being tested,

Job :
Spark sql job is written in scala, and to run on 1TB TPCDS BENCHMARK DATA which is in parquet,snappy format and hive tables created on top of it.

Cluster manager :
Kubernetes 1.9.7
spark-2.2.0-k8s-0.5.0-bin-2.7.3

Spark sql configuration :

Set 1 :
spark.executor.heartbeatInterval 20s
spark.executor.cores 4
spark.driver.cores 4
spark.driver.memory 15g
spark.executor.memory 15g
spark.cores.max 220
spark.rpc.numRetries 5
spark.rpc.retry.wait 5
spark.network.timeout 1800
spark.sql.broadcastTimeout 1200
spark.sql.crossJoin.enabled true
spark.sql.starJoinOptimization true
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenodeHA/tmp/spark-history
spark.sql.codegen true
spark.kubernetes.allocation.batch.size 30

Set 2 :
spark.executor.heartbeatInterval 20s
spark.executor.cores 4
spark.driver.cores 4
spark.driver.memory 11g
spark.driver.memoryOverhead 4g
spark.executor.memory 11g
spark.executor.memoryOverhead 4g
spark.cores.max 220
spark.rpc.numRetries 5
spark.rpc.retry.wait 5
spark.network.timeout 1800
spark.sql.broadcastTimeout 1200
spark.sql.crossJoin.enabled true
spark.sql.starJoinOptimization true
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenodeHA/tmp/spark-history
spark.sql.codegen true
spark.kubernetes.allocation.batch.size 30

Kryoserialiser is being used and with "spark.kryoserializer.buffer.mb" value of 64mb.
50 executors are being spawned using spark.executor.instances=50 submit argument.

Issues Observed:

Spark sql job is terminating abruptly and the drivers,executors are being killed randomly.
driver and executors pods gets killed suddenly the job fails.

Few different stack traces are found across different runs,

Stack Trace 1:
"2018-05-10 06:31:28 ERROR ContextCleaner:91 - Error cleaning broadcast 136
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)"
File attached : StackTrace1.txt

Stack Trace 2:
"org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.178.1.105:38039^M
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)^M
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:418)"
File attached : StackTrace2.txt

Stack Trace 3:
"18/05/10 11:21:17 WARN KubernetesTaskSetManager: Lost task 3.0 in stage 48.0 (TID 16486, 192.178.1.35, executor 41): FetchFailed(null, shuffleId=29, mapId=-1, reduceId=3, message=^M
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 29^M
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)^M
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)"
File attached : StackTrace3.txt

Stack Trace 4:
"ERROR KubernetesTaskSchedulerImpl: Lost executor 11 on 192.178.1.123: Executor lost for unknown reasons."
This is repeating constantly until the executors are dead completely without any stack traces.

Also, we see 18/05/11 07:23:23 INFO DAGScheduler: failed: Set()
what does this mean ? anything is wrong or it says failed set is empty that means no failure ?

Observations or changes tried out :

Monitored memory and CPU utilisation across executors and none of them are hitting the limits.
As per few readings and suggestions
spark.network.timeout was increased to 1800 from 600, but did not help.
Also, driver and executor memory overhead was kept default in set 1 of the config and it was 0.1*15g=1.5gb.
Increased this value also, explicitly to 4gb and reduced driver and executor memory values to 11gb from 15gb as per set 2.
this did not yield any valuable results, same failures are being observed.

SparkSql is being used to run the queries,
sample code lines :
val qresult = spark.sql(q)
qresult.show()
No manual repartitioning is being done in the code.

StackTrace1.txt
StackTrace2.txt
StackTrace3.txt
StackTrace4.txt