Much of this guide is built from the following sources.
There are older spark instantiation methods under the Context
names - SparkContext
, SQLContext
, and HiveContext
. We don't need those with the SparkSession
method that was introduced in Spark 2. We will use the SparkConf
method to configure a few settings of our spark environment. Finally, the spark SQL functions are a must to run optimized spark code. I have elected to import them with the abbreviation F
.
from pyspark.sql import SparkSession #, SQLContext https://spark.apache.org/docs/1.6.1/sql-programming-guide.html
from pyspark import SparkConf #, SparkContext if you don't want to use SparkSession
from pyspark.sql import functions as F # access to the sql functions https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
With our methods imported we can configure our session. These configurations are built to work with the spark configuration from docker_guide. We will generally use the default configuration values in our work. You can review all the spark configuration options here.
- We will want make our spark user interface to a known port that we have opened.
- As we will be using Postgress, we will need to provide the respective spark.jar that can be found in docker_guide
- The batch size of 10000 is the default. Lowering this value can fix out-of-memory problems and larger values can boost memory utilization.
- We will want to specify our warehouse location so it doesn't default to the working directory of the Jupyter notebook.
- Finally, we can specify the driver memory available.
warehouse_location = os.path.abspath('../../../data/spark-warehouse') # make sure your path is set up correctly.
# Create the session
conf = (SparkConf()
.set("spark.ui.port", "4041")
.set('spark.jars', '/home/jovyan/scratch/postgresql-42.2.18.jar')
.set("spark.sql.inMemoryColumnarStorage.compressed", True) # the default has changed so lets just make sure.
.set("spark.sql.inMemoryColumnarStorage.batchSize",10000) # default
.set("spark.sql.warehouse.dir", warehouse_location) # set above
.set("spark.driver.memory", "7g") # lower or increase depending on your system. Local mode helps with executions as well.
)
# Create the Session (used to be context)
# you can move the number up or down depending on your memory and processors "local[*]" will use all.
spark = SparkSession.builder \
.master("local[3]") \
.appName('test') \
.config(conf=conf) \
.getOrCreate()
- Spark has built a framework to optimize RDD use when we use the DataFrame methods.
- Spark internally stores DataFrames in a binary format so there is no need to serialize and deserialize data as it moves over the cluster.
The Apache Parquet format is optimal. The following table compares the savings as well as the speedup obtained by converting data into Parquet from CSV.
Dataset | Size on Amazon S3 | Query Run Time | Data Scanned | Cost |
---|---|---|---|---|
Data stored as CSV files | 1 TB | 236 seconds | 1.15 TB | $5.75 |
Data stored in Apache Parquet Format | 130 GB | 6.78 seconds | 2.51 GB | $0.01 |
Savings | 87% less when using Parquet | 34x faster | 99% less data scanned | 99.7% savings |
# read and write parquet files
In spark, the DataFrames interim computations are not stored in memory. The DataFrames are only evaluated when the action is called. If we have expensive transformations, then using cache()
(or persist()
for finer control) can speed up your EDA.