Spark cluster
szilard opened this issue · 13 comments
previous results single server:
100M | 10M | ||||||
---|---|---|---|---|---|---|---|
trees | depth | time [s] | AUC | RAM [GB] | time [s] | AUC | RAM [GB] |
1 | 1 | 1150 | 0.634 | 620 | 70 | 0.635 | 110 |
1 | 10 | 1350 | 0.712 | 620 | 90 | 0.712 | 112 |
10 | 10 | 7850 | 0.731 | 780 | 830 | 0.731 | 125 |
100 | 10 | crash OOM | >960 (OOM) | 8070 | 0.755 | 230 |
10M ran on:
r4.8xlarge (32 cores, 1 NUMA, 240GB RAM)
100M ran on:
x1e.8xlarge (32 cores, 1 NUMA, 960GB RAM)
first try same 1 master+1 slave:
wget https://s3.amazonaws.com/benchm-ml--main/train-10m.csv && \
wget https://s3.amazonaws.com/benchm-ml--main/test.csv
hadoop fs -copyFromLocal train-10m.csv /train-10m.csv
hadoop fs -copyFromLocal test.csv /test.csv
spark-shell
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.DoubleType
val loader = spark.read.format("com.databricks.spark.csv").option("header", "true")
val trainDF = loader.load("/train-10m.csv")
val testDF = loader.load("/test.csv")
val fullDF0 = trainDF.withColumn("isTrain", lit(true)).unionAll(testDF.withColumn("isTrain", lit(false)))
val fullDF = fullDF0.withColumn("DepTime", col("DepTime").cast(DoubleType)).withColumn("Distance", col("Distance").cast(DoubleType))
fullDF.printSchema
fullDF.show(5)
val res = new RFormula().setFormula("dep_delayed_15min ~ . - isTrain").fit(fullDF).transform(fullDF)
res.printSchema
res.show(5)
val finalTrainDF = res.where(col("isTrain"))
val finalTestDF = res.where(!col("isTrain"))
finalTrainDF.write.mode("overwrite").parquet("/spark_ohe-train-10m.parquet")
finalTestDF.write.mode("overwrite").parquet("/spark_ohe-test.parquet")
//// NEW:
val finalTrainDF_100m = finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF)))))))))
finalTrainDF_100m.count()
finalTrainDF_100m.write.mode("overwrite").parquet("/spark_ohe-train-100m.parquet")
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val d_train = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train.count(), d_test.count())
///
d_train.rdd.getNumPartitions
/////
val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
setMaxIter(10).setMaxDepth(10).setStepSize(0.1).
setMaxBins(100).setMaxMemoryInMB(10240) // max possible setMaxMemoryInMB (otherwise errors out)
val pipeline = new Pipeline().setStages(Array(gbm))
val now = System.nanoTime
val model = pipeline.fit(d_train)
val elapsed = ( System.nanoTime - now )/1e9
elapsed
val predictions = model.transform(d_test)
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("probability").setMetricName("areaUnderROC")
evaluator.evaluate(predictions)
64 partitions
runs 1180sec, AUC=0.7313, RAM 73G (1 slave)
2nd run: 1450 sec, 0.731, RAM 73G
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
/// NEW:
val d_train0 = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())
d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(32).cache()
d_train.count()
d_train.rdd.getNumPartitions
32 partitions:
runs 1560sec, AUC=0.7298, RAM 72GB
local again:
runs 830 sec, 0.7308, RAM 125GB
local with 64 partitions:
runs 830 sec, 0.7292, RAM 123GB
name | value |
---|---|
spark.driver.memory | 219695M |
spark.executor.instances | 10 |
spark.default.parallelism | 640 |
spark.submit.deployMode | client |
spark.master | yarn |
spark.executor.cores | 32 |
scala> d_train.rdd.getNumPartitions
res1: Int = 585
scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
| setMaxIter(10).setMaxDepth(10).setStepSize(0.1).
scala> elapsed
res2: Double = 1825.94445773
scala> evaluator.evaluate(predictions)
res3: Double = 0.731131999798129
100M, 10 trees
single server - x1e.8xlarge (32 cores, 960GB RAM)
time [s] | AUC | RAM [GB] |
---|---|---|
7850 | 0.731 | 780 |
cluster with 10 slave nodes - r4.8xlarge (32 cores, 240GB RAM)
time [s] | AUC | RAM [GB] |
---|---|---|
1825 | 0.731 | 10*72 |
4.3x time ratio (but should do 10 nodes cluster vs 1 node cluster as we've seen the cluster overhead penalty)
if penalty on cluster is same as for 10M above (1.4-1.7x), then 6-7.3x speedup from 1 slave to 10
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
/// NEW:
val d_train0 = spark.read.parquet("/spark_ohe-train-100m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())
d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(320).cache()
d_train.count()
d_train.rdd.getNumPartitions
320 partitions
run stopped after 2 trees as it was getting slower:
10M:
scala> d_train.rdd.getNumPartitions
res1: Int = 117
scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
| setMaxIter(10).setMaxDepth(10).setStepSize(0.1).
scala> elapsed
res2: Double = 414.150597334
scala> evaluator.evaluate(predictions)
res3: Double = 0.7292960760201486
match partitions to cores:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
/// NEW:
val d_train0 = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())
d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(320).cache()
d_train.count()
d_train.rdd.getNumPartitions
scala> d_train.rdd.getNumPartitions
res3: Int = 320
scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
| setMaxIter(10).setMaxDepth(10).setStepSize(0.1).
scala> elapsed
res4: Double = 327.705288398
scala> evaluator.evaluate(predictions)
res5: Double = 0.7304255377773654
10 trees, depth 10
size | system | nodes | cores | partitions | time [s] | AUC | RAM [GB] | total RAM [GB] |
---|---|---|---|---|---|---|---|---|
10M | local | r4.8xl | 32 | 32 | 830 | 0.731 | 125 | 240 |
10M | local | r4.8xl | 32 | 64 (m) | 830 | 0.731 | 123 | 240 |
10M | Cluster_1 | r4.8xl | 32 | 64 | 1180 | 0.731 | 73 | 240 |
10M | Cluster_1 | r4.8xl | 32 | 32 (m) | 1560 | 0.73 | 72 | 240 |
10M | Cluster_10 | r4.8xl | 320 | 117 | 415 | 0.73 | 2400 | |
10M | Cluster_10 | r4.8xl | 320 | 320 (m) | 330 | 0.73 | 2400 | |
100M | local | x1e.8xl | 32 | 7850 | 0.731 | 780 | 960 | |
100M | Cluster_10 | r4.8xl | 320 | 585 | 1825 | 0.731 | 10*72 | 2400 |
100M | Cluster_10 | r4.8xl | 320 | 320 (m) | >1825 | 2400 |
best of partition (auto/manual):
size | system | nodes | cores | partitions | time [s] | RAM [GB] | total RAM [GB] |
---|---|---|---|---|---|---|---|
10M | local | r4.8xl | 32 | 32 | 830 | 125 | 240 |
10M | Cluster_1 | r4.8xl | 32 | 64 | 1180 | 73 | 240 |
10M | Cluster_10 | r4.8xl | 320 | 320 (m) | 330 | 2400 | |
100M | local | x1e.8xl | 32 | 7850 | 780 | 960 | |
100M | Cluster_10 | r4.8xl | 320 | 585 | 1825 | 10*72 | 2400 |