/spark-adaptive

Primary LanguageScalaApache License 2.0Apache-2.0

Spark SQL Adaptive Execution

There are three main features in Adaptive Execution, including auto setting the shuffle partition number, optimizing join strategy at runtime and handling skewed join. These features can be enabled separately. To start with Adaptive Exection on Spark 2.3, please build branch ae-2.3-08 and at least set spark.sql.adaptive.enabled to true. For users who enabled external shuffle service, please also upgrade external shuffle service to use adaptive execution feature.

An Engilish version design doc is available on google doc. A Chinese version blog is available on CSDN that introduces the features and benchmark results. SPARK-23128 is the Jira for contributing this work to Apache Spark.

Auto Setting The Shuffle Partition Number

Property NameDefaultMeaning
spark.sql.adaptive.enabled false When true, enable adaptive query execution.
spark.sql.adaptive.minNumPostShufflePartitions 1 The minimum number of post-shuffle partitions used in adaptive execution. This can be used to control the minimum parallelism.
spark.sql.adaptive.maxNumPostShufflePartitions 500 The maximum number of post-shuffle partitions used in adaptive execution. This is also used as the initial shuffle partition number so please set it to an reasonable value.
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 67108864 The target post-shuffle input size in bytes of a task. By default is 64 MB.
spark.sql.adaptive.shuffle.targetPostShuffleRowCount 20000000 The target post-shuffle row count of a task. This only takes effect if row count information is collected.

Optimizing Join Strategy at Runtime

Property NameDefaultMeaning
spark.sql.adaptive.join.enabled true When true and spark.sql.adaptive.enabled is enabled, a better join strategy is determined at runtime.
spark.sql.adaptiveBroadcastJoinThreshold equals to spark.sql.autoBroadcastJoinThreshold Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join in adaptive exeuction mode. If not set, it equals to spark.sql.autoBroadcastJoinThreshold.

Handling Skewed Join

Property NameDefaultMeaning
spark.sql.adaptive.skewedJoin.enabled false When true and spark.sql.adaptive.enabled is enabled, a skewed join is automatically handled at runtime.
spark.sql.adaptive.skewedPartitionFactor 10 A partition is considered as a skewed partition if its size is larger than this factor multiple the median partition size and also larger than spark.sql.adaptive.skewedPartitionSizeThreshold, or if its row count is larger than this factor multiple the median row count and also larger than spark.sql.adaptive.skewedPartitionRowCountThreshold.
spark.sql.adaptive.skewedPartitionSizeThreshold 67108864 Configures the minimum size in bytes for a partition that is considered as a skewed partition in adaptive skewed join.
spark.sql.adaptive.skewedPartitionRowCountThreshold 10000000 Configures the minimum row count for a partition that is considered as a skewed partition in adaptive skewed join.
spark.shuffle.statistics.verbose false Collect shuffle statistics in verbose mode, including row counts etc. This is required for handling skewed join.