Firestorm is a Remote Shuffle Service, and provides the capability for Apache Spark applications to store shuffle data on remote servers.
Firestorm contains coordinator cluster, shuffle server cluster and remote storage(eg, HDFS) if necessary.
Coordinator will collect status of shuffle server and do the assignment for the job.
Shuffle server will receive the shuffle data, merge them and write to storage.
Depend on different situation, Firestorm supports Memory & Local, Memory & Remote Storage(eg, HDFS), Memory & Local & Remote Storage(recommendation for production environment).
-
Spark driver ask coordinator to get shuffle server for shuffle process
-
Spark task write shuffle data to shuffle server with following step:
- Send KV data to buffer
- Flush buffer to queue when buffer is full or buffer manager is full
- Thread pool get data from queue
- Request memory from shuffle server first and send the shuffle data
- Shuffle server cache data in memory first and flush to queue when buffer manager is full
- Thread pool get data from queue
- Write data to storage with index file and data file
- After write data, task report all blockId to shuffle server, this step is used for data validation later
- Store taskAttemptId in MapStatus to support Spark speculation
-
Depend on different storage type, spark task read shuffle data from shuffle server or remote storage or both of them.
The shuffle data is stored with index file and data file. Data file has all blocks for specific partition and index file has metadata for every block.
Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x, Spark 3.2.x
Note: To support dynamic allocation, the patch(which is included in client-spark/patch folder) should be applied to Spark
Current support Hadoop 2.8.5's MapReduce framework.
Firestorm is built using Apache Maven. To build it, run:
mvn -DskipTests clean package
To package the Firestorm, run:
./build_distribution.sh
rss-xxx.tgz will be generated for deployment
- unzip package to RSS_HOME
- update RSS_HOME/bin/rss-env.sh, eg,
JAVA_HOME=<java_home> HADOOP_HOME=<hadoop home> XMX_SIZE="16g"
- update RSS_HOME/conf/coordinator.conf, eg,
rss.rpc.server.port 19999 rss.jetty.http.port 19998 rss.coordinator.server.heartbeat.timeout 30000 rss.coordinator.app.expired 60000 rss.coordinator.shuffle.nodes.max 5 # enable dynamicClientConf, and coordinator will be responsible for most of client conf rss.coordinator.dynamicClientConf.enabled true # config the path of client conf rss.coordinator.dynamicClientConf.path <RSS_HOME>/conf/dynamic_client.conf # config the path of excluded shuffle server rss.coordinator.exclude.nodes.file.path <RSS_HOME>/conf/exclude_nodes
- update <RSS_HOME>/conf/dynamic_client.conf, rss client will get default conf from coordinator eg,
# MEMORY_LOCALFILE_HDFS is recommandation for production environment rss.storage.type MEMORY_LOCALFILE_HDFS # multiple remote storages are supported, and client will get assignment from coordinator rss.coordinator.remote.storage.path hdfs://cluster1/path,hdfs://cluster2/path rss.writer.require.memory.retryMax 1200 rss.client.retry.max 100 rss.writer.send.check.timeout 600000 rss.client.read.buffer.size 14m
- start Coordinator
bash RSS_HOME/bin/start-coordnator.sh
- unzip package to RSS_HOME
- update RSS_HOME/bin/rss-env.sh, eg,
JAVA_HOME=<java_home> HADOOP_HOME=<hadoop home> XMX_SIZE="80g"
- update RSS_HOME/conf/server.conf, eg,
rss.rpc.server.port 19999 rss.jetty.http.port 19998 rss.rpc.executor.size 2000 # it should be configed the same as in coordinator rss.storage.type MEMORY_LOCALFILE_HDFS rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999 # local storage path for shuffle server rss.storage.basePath /data1/rssdata,/data2/rssdata.... # it's better to config thread num according to local disk num rss.server.flush.thread.alive 5 rss.server.flush.threadPool.size 10 rss.server.buffer.capacity 40g rss.server.read.buffer.capacity 20g rss.server.heartbeat.timeout 60000 rss.server.heartbeat.interval 10000 rss.rpc.message.max.size 1073741824 rss.server.preAllocation.expired 120000 rss.server.commit.timeout 600000 rss.server.app.expired.withoutHeartbeat 120000 # note: the default value of rss.server.flush.cold.storage.threshold.size is 64m # there will be no data written to DFS if set it as 100g even rss.storage.type=MEMORY_LOCALFILE_HDFS # please set proper value if DFS is used, eg, 64m, 128m. rss.server.flush.cold.storage.threshold.size 100g
- start Shuffle Server
bash RSS_HOME/bin/start-shuffle-server.sh
-
Add client jar to Spark classpath, eg, SPARK_HOME/jars/
The jar for Spark2 is located in <RSS_HOME>/jars/client/spark2/rss-client-XXXXX-shaded.jar
The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-XXXXX-shaded.jar
-
Update Spark conf to enable Firestorm, eg,
spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
To support spark dynamic allocation with Firestorm, spark code should be updated. There are 2 patches for spark-2.4.6 and spark-3.1.2 in spark-patches folder for reference.
After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:
spark.shuffle.service.enabled false
spark.dynamicAllocation.enabled true
- Add client jar to the classpath of each NodeManager, e.g., /share/hadoop/mapreduce/
The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar
-
Update MapReduce conf to enable Firestorm, eg,
-Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999 -Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster -Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector -Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffle
Note that the RssMRAppMaster will automatically disable slow start (i.e., mapreduce.job.reduce.slowstart.completedmaps=1
)
and job recovery (i.e., yarn.app.mapreduce.am.job.recovery.enable=false
)
The important configuration is listed as following.
Property Name | Default | Description |
---|---|---|
rss.coordinator.server.heartbeat.timeout | 30000 | Timeout if can't get heartbeat from shuffle server |
rss.coordinator.assignment.strategy | PARTITION_BALANCE | Strategy for assigning shuffle server, PARTITION_BALANCE should be used for workload balance |
rss.coordinator.app.expired | 60000 | Application expired time (ms), the heartbeat interval should be less than it |
rss.coordinator.shuffle.nodes.max | 9 | The max number of shuffle server when do the assignment |
rss.coordinator.dynamicClientConf.path | - | The path of configuration file which have default conf for rss client |
rss.coordinator.exclude.nodes.file.path | - | The path of configuration file which have exclude nodes |
rss.coordinator.exclude.nodes.check.interval.ms | 60000 | Update interval (ms) for exclude nodes |
rss.rpc.server.port | - | RPC port for coordinator |
rss.jetty.http.port | - | Http port for coordinator |
Property Name | Default | Description |
---|---|---|
rss.coordinator.quorum | - | Coordinator quorum |
rss.rpc.server.port | - | RPC port for Shuffle server |
rss.jetty.http.port | - | Http port for Shuffle server |
rss.server.buffer.capacity | - | Max memory of buffer manager for shuffle server |
rss.server.memory.shuffle.highWaterMark.percentage | 75.0 | Threshold of spill data to storage, percentage of rss.server.buffer.capacity |
rss.server.memory.shuffle.lowWaterMark.percentage | 25.0 | Threshold of keep data in memory, percentage of rss.server.buffer.capacity |
rss.server.read.buffer.capacity | - | Max size of buffer for reading data |
rss.server.heartbeat.interval | 10000 | Heartbeat interval to Coordinator (ms) |
rss.server.flush.threadPool.size | 10 | Thread pool for flush data to file |
rss.server.commit.timeout | 600000 | Timeout when commit shuffle data (ms) |
rss.storage.type | - | Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS |
rss.server.flush.cold.storage.threshold.size | 64M | The threshold of data size for LOACALFILE and HDFS if MEMORY_LOCALFILE_HDFS is used |
Property Name | Default | Description |
---|---|---|
spark.rss.writer.buffer.size | 3m | Buffer size for single partition data |
spark.rss.writer.buffer.spill.size | 128m | Buffer size for total partition data |
spark.rss.coordinator.quorum | - | Coordinator quorum |
spark.rss.storage.type | - | Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS |
spark.rss.client.send.size.limit | 16m | The max data size sent to shuffle server |
spark.rss.client.read.buffer.size | 32m | The max data size read from storage |
spark.rss.client.send.threadPool.size | 10 | The thread size for send shuffle data to shuffle server |
Property Name | Default | Description |
---|---|---|
mapreduce.rss.coordinator.quorum | - | Coordinator quorum |
mapreduce.rss.storage.type | - | Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS |
mapreduce.rss.client.max.buffer.size | 3k | The max buffer size in map side |
mapreduce.rss.client.read.buffer.size | 32m | The max data size read from storage |
Firestorm is under the Apache License Version 2.0. See the LICENSE file for details.
For more information about contributing issues or pull requests, see Firestorm Contributing Guide.
We provide free support for users using this project. If you want to join user wechat group for further help and collaboration. You can scan the following QR code or search wechatID xinghuojihua_01
, add our assistant on wechat, and remark Firestorm
. The assistant will help you join our wechat group.