/scala-ble

A Scala Spark based project to experiment with map-reduce algorithms on big data graph shaped

Primary LanguageScalaMIT LicenseMIT

Scala Spark Project

A Scala (v2.11) - Spark (v2.3) based project to experiment with map-reduce algorithms on big data graph shaped.

The dependencies (only Apache Spark core) linking and building for this project is done via sbt.

To avoid dealing with duplicate and/or conflicts between libraries, before launching the command sbt assembly to create the .jar artifact, add the provided option to the org.apache.spark.spark-core dependency in the build.sbt file.

Then you can launch the application using the spark-submit <jarFile> --appArguments command.

Command-line arguments

Usage: ScalaSparkProject.jar [--debug] [--local N_CORES] [--partitions N_PART] [--outputs] [--trasform MODE] [--dpr N_DIGITS] [--friends-rec N_RECS] [--triangles] --input GRAPHFILE

The last four optional arguments are the main program functions also called jobs. If no job is specified as a command-line argument, ALL jobs gets executed.
You can run the application locally and control the number of cores used using the --local option.
Giving the argument --outputs the program will save the resulting RDDs in a folder ./outputs/jobName.

OPTIONS

  • --local N_CORES where N_CORES must be an Integer indicating the number of cores to use. Only when launching the application locally.
  • --partitions N_PARTS where N_PARTS must be an Integer > 0 indicating the number of partitions for the main edges RDD. Recommended value is #cores*#executors*1.75 (powerful flag to tune parallelism, use with caution).
  • --trasform MODE where MODE must be an Integer between 0 and 2 to indicate the kind of, per-vertex, edges list to get (0: outgoing links, 1: ingoing links, 2: both).
  • --dpr N_DIGITS where N_DIGITS must be an Integer between 1 and 20 to shift down the decimal digits for the tolerance value (e.g. --dpr 4 => toll = 1e-4).
  • --friends-rec N_RECS where N_RECS must be an Integer between 0 and 50 specifying the number of the, per-user/vertex, friends recommendations wanted (0: to emit all recommendations).

How to deploy Spark jobs on Google Cloud Platform clusters

Configure cluster

  1. Create a project via the Cloud Console
  2. Download the Cloud SDK from the official source here and install it on your system
  3. Use the command gcloud config set project <projectID> (as explained here) where projectID is the ID associated with project created at point 1.
  4. Go to the webpage of Dataproc and create a cluster giving it the name spark-cluster (the nearest region is europe-west3-a i.e. Frankfurt)
  5. You can access the master node of your cluster with the command gcloud compute ssh <yourGCPusername>@spark-cluster-m --zone europe-west3-a
  6. Run the command hdfs dfs -mkdir -p hdfs://spark-cluster-m/user/<yourGCPusername> to create a folder in the HDFS filesystem

Deploy project

  • Build your ScalaSpark project into a .jar file (specifying the spark-core dependency as % 'provided' in the build.sbt file and then the sbt assembly command)
  • Copy to the cluster master node:
    • the application .jar with the command gcloud compute scp <path/of/app.jar> <yourGCPusername>@spark-cluster-m:/home/<yourGCPusername> --zone europe-west3-a
    • the graph dataset to operate on with the command gcloud compute scp <path/of/graphfile.txt> <yourGCPusername>@spark-cluster-m:/home/<yourGCPusername> --zone europe-west3-a
  • Upload the input graph file to HDFS:
    • SSH into the master node (point 5. of the previous section) and run the command hdfs dfs -put <graphfile.txt>
  • Run the Spark application on the master node via YARN using one of the two following ways:
    • SSH into the master node (point 5. of the previous section) and run the command spark-submit <app.jar> --appArguments
    • Submit a Dataproc job:
      • via the GCP Web Console in the Dataproc Job section
      • from the directory of your .jar gcloud dataproc jobs submit spark --cluster spark-cluster --region europe-west3 --jar <app.jar> -- --appArguments

Monitor execution

The status of the applications running on your cluster can be monitored using several web pages.

First setup SSH tunnelling to the master node of your cluster with the commands:

  • gcloud compute ssh <yourGCPusername>@spark-cluster-m --zone europe-west3-a -- -D 1080 -N
  • /usr/bin/google-chrome --proxy-server="socks5://localhost:1080" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp/spark-cluster-m

Then inside the Chrome browser enter URL:spark.dynamicAllocation.enabled

  • http://spark-cluster-m:8088 for YARN Manager
  • http://spark-cluster-m:9870 for Hadoop
  • http://spark-cluster-m:18080 for Spark History server

Logs verbosity

The level of the log verbosity is defined in /etc/spark/conf/log4j.properties files and can be overridden by the flag --driver-log-levels root=FATAL,com.example=INFO

Output files

  • Inspect and/or retrieve output files from the HDFS filesytem in two ways:
    • using the gui of the Hadoop webpage
    • using the CLI Hadoop commands (while SSHed into the master node): hadoop dfs -ls , etc..

Executors and memory configuration

When running a Spark application we have to consider three fundamental (Spark/YARN) parameters to get the best performance:

  • --num-executors
  • --executor-cores
  • --executor-memory

The official Spark documentation, this and this blog posts give some hints for an optimal configuration of these parameters.

Algorithms performance

  • Using the dataset soc-Epinions1.txt [75K nodes, 508K edges] (available from SNAP here)

    • Locally with local[8](JVM memory -Xms10g -Xmx14g) :
      • trasform(2): 2.0 seconds
      • dynamicPageRank(8): 2.0 seconds
      • friends_recommendations(0): 22.0 seconds
      • triangles_count_V4: 643.0 seconds = 10,71 min
    • Spark cluster of 1 master - 4 slaves (each with 2 CPU / 7.5GB RAM):
      • triangles_count_V1: 63 min
    • Spark cluster of 1 master - 5 slaves (each with 16 CPU / 16GB RAM):
      • friends_recommendations: 23 seconds
      • triangles_count_V4: 577.0 seconds = 9,6 min
      • triangles_count_V4 (repartition 85): 68.0 seconds
      • triangles_count_V4 (repartition 148): 55.0 seconds
      • triangles_count_V4 (repartition 148, HDFS input parallel loading): 53.0 seconds
        • launch command spark-submit --conf spark.dynamicAllocation.enabled=false --num-executors 17 --executor-cores 5 --executor-memory 19G ScalaSparkProject-assembly-0.1.jar --debug --outputs --partitions 148 --input soc-Epinions1.txt
        • trasform(2): 12 seconds
        • dynamicPageRank(8): 5 seconds
        • friends_recommendations(0): 19 seconds
  • Using the dataset soc-LiveJournal1.txt [4.8M nodes, 68M edges] (available from SNAP here)

    • Spark cluster of 1 master - 5 slaves (each with 16 CPU / 16GB RAM):
      • trasform(2): 38 seconds
      • dynamicPageRank(8): 14 seconds
      • friends_recommendations(0): 806 seconds = 13.3 minutes
      • triangles_count_V4: NA (due to OOM errors)