szilard/GBM-perf

Spark cluster

szilard opened this issue · 13 comments

previous results single server:

    100M     10M    
trees depth time [s] AUC RAM [GB] time [s] AUC RAM [GB]
1 1 1150 0.634 620 70 0.635 110
1 10 1350 0.712 620 90 0.712 112
10 10 7850 0.731 780 830 0.731 125
100 10 crash OOM   >960 (OOM) 8070 0.755 230

10M ran on:
r4.8xlarge (32 cores, 1 NUMA, 240GB RAM)

100M ran on:
x1e.8xlarge (32 cores, 1 NUMA, 960GB RAM)

first try same 1 master+1 slave:

Screen Shot 2019-05-11 at 10 56 09 AM
Screen Shot 2019-05-11 at 10 56 55 AM

wget https://s3.amazonaws.com/benchm-ml--main/train-10m.csv && \
wget https://s3.amazonaws.com/benchm-ml--main/test.csv

hadoop fs -copyFromLocal train-10m.csv /train-10m.csv
hadoop fs -copyFromLocal test.csv /test.csv

spark-shell
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.DoubleType

val loader = spark.read.format("com.databricks.spark.csv").option("header", "true")
val trainDF = loader.load("/train-10m.csv")
val testDF = loader.load("/test.csv")

val fullDF0 = trainDF.withColumn("isTrain", lit(true)).unionAll(testDF.withColumn("isTrain", lit(false)))

val fullDF = fullDF0.withColumn("DepTime", col("DepTime").cast(DoubleType)).withColumn("Distance", col("Distance").cast(DoubleType))

fullDF.printSchema
fullDF.show(5)


val res = new RFormula().setFormula("dep_delayed_15min ~ . - isTrain").fit(fullDF).transform(fullDF)

res.printSchema
res.show(5)

val finalTrainDF = res.where(col("isTrain"))
val finalTestDF = res.where(!col("isTrain"))

finalTrainDF.write.mode("overwrite").parquet("/spark_ohe-train-10m.parquet")
finalTestDF.write.mode("overwrite").parquet("/spark_ohe-test.parquet")

//// NEW:

val finalTrainDF_100m = finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF.union(finalTrainDF)))))))))
finalTrainDF_100m.count()

finalTrainDF_100m.write.mode("overwrite").parquet("/spark_ohe-train-100m.parquet")

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val d_train = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train.count(), d_test.count())

///

d_train.rdd.getNumPartitions

/////

val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
  setMaxIter(10).setMaxDepth(10).setStepSize(0.1).
  setMaxBins(100).setMaxMemoryInMB(10240)     // max possible setMaxMemoryInMB (otherwise errors out)
val pipeline = new Pipeline().setStages(Array(gbm))

val now = System.nanoTime
val model = pipeline.fit(d_train)
val elapsed = ( System.nanoTime - now )/1e9
elapsed

val predictions = model.transform(d_test)

val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("probability").setMetricName("areaUnderROC")
evaluator.evaluate(predictions)

64 partitions

runs 1180sec, AUC=0.7313, RAM 73G (1 slave)
2nd run: 1450 sec, 0.731, RAM 73G


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

/// NEW: 

val d_train0 = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())

d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(32).cache()
d_train.count()
d_train.rdd.getNumPartitions

32 partitions:

runs 1560sec, AUC=0.7298, RAM 72GB


local again:

runs 830 sec, 0.7308, RAM 125GB

local with 64 partitions:

runs 830 sec, 0.7292, RAM 123GB

compare cluster with local (64 partitions):

local runs 830 sec, 0.7292, RAM 123GB
cluster runs 1180sec-1450 sec, AUC=0.7313, RAM 73G (1 slave)
1.4-1.7x slower

local:

Screen Shot 2019-05-11 at 1 39 15 PM
Screen Shot 2019-05-11 at 1 40 16 PM

cluster:

Screen Shot 2019-05-11 at 1 43 21 PM
Screen Shot 2019-05-11 at 1 43 34 PM

cluster with 10 slave nodes:

Screen Shot 2019-05-13 at 12 23 29 AM

10M records
after reading parquet, num. partitions (auto) 117 (vs 320 total cores), so running larger dataset instead:

100M records
after reading parquet, num. partitions (auto) 585 (vs 320 total cores)

name value
spark.driver.memory 219695M
spark.executor.instances 10
spark.default.parallelism 640
spark.submit.deployMode client
spark.master yarn
spark.executor.cores 32
scala> d_train.rdd.getNumPartitions
res1: Int = 585

scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
     |   setMaxIter(10).setMaxDepth(10).setStepSize(0.1).

scala> elapsed
res2: Double = 1825.94445773

scala> evaluator.evaluate(predictions)
res3: Double = 0.731131999798129

100M, 10 trees

single server - x1e.8xlarge (32 cores, 960GB RAM)

time [s] AUC RAM [GB]
7850 0.731 780

cluster with 10 slave nodes - r4.8xlarge (32 cores, 240GB RAM)

time [s] AUC RAM [GB]
1825 0.731 10*72

4.3x time ratio (but should do 10 nodes cluster vs 1 node cluster as we've seen the cluster overhead penalty)

if penalty on cluster is same as for 10M above (1.4-1.7x), then 6-7.3x speedup from 1 slave to 10

Screen Shot 2019-05-13 at 12 48 55 AM
Screen Shot 2019-05-13 at 12 49 46 AM
Screen Shot 2019-05-13 at 12 50 04 AM
Screen Shot 2019-05-13 at 12 50 24 AM


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

/// NEW: 

val d_train0 = spark.read.parquet("/spark_ohe-train-100m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())

d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(320).cache()
d_train.count()
d_train.rdd.getNumPartitions

320 partitions

run stopped after 2 trees as it was getting slower:

Screen Shot 2019-05-13 at 1 21 16 AM
Screen Shot 2019-05-13 at 1 21 21 AM

Screen Shot 2019-05-13 at 1 24 14 AM

10M:

scala> d_train.rdd.getNumPartitions
res1: Int = 117

scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
     |   setMaxIter(10).setMaxDepth(10).setStepSize(0.1).

scala> elapsed
res2: Double = 414.150597334

scala> evaluator.evaluate(predictions)
res3: Double = 0.7292960760201486

match partitions to cores:


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

/// NEW: 

val d_train0 = spark.read.parquet("/spark_ohe-train-10m.parquet").cache()
val d_test = spark.read.parquet("/spark_ohe-test.parquet").cache()
(d_train0.count(), d_test.count())

d_train0.rdd.getNumPartitions
val d_train = d_train0.repartition(320).cache()
d_train.count()
d_train.rdd.getNumPartitions
scala> d_train.rdd.getNumPartitions
res3: Int = 320

scala> val gbm = new GBTClassifier().setLabelCol("label").setFeaturesCol("features").
     |   setMaxIter(10).setMaxDepth(10).setStepSize(0.1).

scala> elapsed
res4: Double = 327.705288398

scala> evaluator.evaluate(predictions)
res5: Double = 0.7304255377773654

10 trees, depth 10

size system nodes cores partitions time [s] AUC RAM [GB] total RAM [GB]
10M local r4.8xl 32 32 830 0.731 125 240
10M local r4.8xl 32 64 (m) 830 0.731 123 240
10M Cluster_1 r4.8xl 32 64 1180 0.731 73 240
10M Cluster_1 r4.8xl 32 32 (m) 1560 0.73 72 240
10M Cluster_10 r4.8xl 320 117 415 0.73   2400
10M Cluster_10 r4.8xl 320 320 (m) 330 0.73   2400
100M local x1e.8xl 32   7850 0.731 780 960
100M Cluster_10 r4.8xl 320 585 1825 0.731 10*72 2400
100M Cluster_10 r4.8xl 320 320 (m) >1825     2400

best of partition (auto/manual):

size system nodes cores partitions time [s] RAM [GB] total RAM [GB]
10M local r4.8xl 32 32 830 125 240
10M Cluster_1 r4.8xl 32 64 1180 73 240
10M Cluster_10 r4.8xl 320 320 (m) 330   2400
100M local x1e.8xl 32   7850 780 960
100M Cluster_10 r4.8xl 320 585 1825 10*72 2400