Apache Spark is a fast and powerful framework that provides an API to perform massive distributed processing over resilient sets of data. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is the fundamental and backbone data type of this engine. Spark SQL is Apache Spark's module for working with structured data and MLlib is Apache Spark's scalable machine learning library. Apache Spark is written in Scala programming language. To support Python with Spark, the Apache Spark community released a tool, PySpark. PySpark has similar computation speed and power as Scala. PySpark is a parallel and distributed engine for running big data applications. Using PySpark, you can work with RDDs in Python programming language.
This tutorial explains how to set up and run Jupyter Notebooks from within IBM® Watson™ Studio. We will use two different datasets 5000_points.txt and people.csv that are available on github. The data set has a corresponding Getting Started with PySpark Notebook.
In this tutorial, you will learn:
- Big Data analysis with PySpark
- Using sql queries with Dataframes by using Spark SQL module
- Machine learning with MLlib library
To complete the tutorial, you need an IBM Cloud account. You can obtain a free trial account, which gives you access to IBM Cloud and IBM Watson Studio.
It should take you approximately 60 minutes to complete this tutorial.
An Object Storage service is required to create projects in Watson Studio. If you do not already have a storage service provisioned, complete the following steps:
- From your IBM Cloud account, search for “object storage” in the IBM Cloud Catalog. Then, click the Object Storage tile.
- Choose Lite plan and Click Create button.
- From your IBM Cloud account, search for “watson studio” in the IBM Cloud Catalog. Then, click the Watson Studio tile.
- Choose Lite plan and Click Create button.
-
Click Get Started.
-
Click either Create a project or New project.
-
Select Create an empty project.
- In the New project window, name the project (for example, “Getting Started with PySpark”).
-
For Storage, you should select the IBM Cloud Object Storage service you created in the previous step. If it is the only storage service that you have provisioned, it is assigned automatically.
-
Click Create.
Next, you’ll download the data set from Github and upload it to Watson Studio.
-
Navigate to the URL for the data set on Github (https://github.com/emrekutlug/Getting-started-with-PySpark/tree/master/datasets), and download the file to your local desktop.
-
In Watson Studio, select Assets.
-
If not already open, click the 1001 data icon at the upper right of the panel to open the Files sub-panel. Then, click Load.
-
Drag the files to the drop area or browse for files to upload the data into Watson Studio.
-
Wait until the file has been uploaded.
Create a Jupyter Notebook and change it to use the data set that you have uploaded to the project.
- In the Asset tab, click Add to Project.
-
Select the Notebook asset type.
-
On the New Notebook page, configure the notebook as follows:
- Select the From URL tab:
-
Enter the name for the notebook (for example, ‘getting-started-with-pyspark’).
-
Select the Spark Python 3.6 runtime system.
-
Enter the following URL for the notebook:
https://github.com/emrekutlug/Getting-started-with-PySpark/blob/master/getting_started_with_pyspark.ipynb
- Click Create Notebook. This initiates the loading and running of the notebook within IBM Watson Studio.
The notebook page should be displayed.
If the notebook is not currently open, you can start it by clicking the Edit icon displayed next to the notebook in the Asset page for the project:
Spark comes with an interactive python shell that has PySpark already installed. PySpark automatically creates a SparkContext for you in the PySpark Shell. SparkContext is an entry point into the world of Spark. An entry point is a way of connecting to Spark cluster. We can use SparkContext using sc variable. In the following examples, we retrieve SparkContext version and Python version of SparkContext.
Lambda functions are anonymous functions in Python. Anonymous functions do not bind to any name in runtime and it returns the functions without any name. They are usually used with map and filter methods. Lambda functions create functions to be called later. In the following example, we use lambda function with map and flter methods.
RDDs are data stacks distributed throughout a cluster of computers. Each stack is calculated on different computers in the cluster. RDDs are the most basic data structure of Spark. We can create RDDs by giving existing objects like Python lists to SparkContext's parallelize method. In the following example, we create a list with numbers and create a RDD from this list.
Transformations and actions are two type of operations in Spark. Transformations create new RDDs. Actions performs computation on the RDDs. Map, filter, flatmap and union are basic RDD transformations. Collect, take, first and count are basic RDD actions. In the following example, we create rdd named numRDD from list and then using map transformation we create a new rdd named cubeRDD from numRDD. Finally, we use collect action to return a list that contains all of the elements in this RDD.
Pair RDD is a special type of RDD to work with datasets with key/value pairs. All regular transformations work on pair RDD. In the following example, we create pair RDD with 4 tuple with two numbers. In each tuple, the first number is key and the second number is value. Then, we apply reduceByKey transformation to pair RDD. ReduceByKey transformation combine values with the same key. Therefore, this transformation adds the values of tuples with the same key.
We can sort keys of tuples using sortByKey transformation like in the following example.
We can count the number of tuples with the same key. In the following example, we see (3,2) because there are two tuple with key 3 in pair RDD.
SparkContext is the main entry point for creating RDDs while SparkSession provides a single point of entry to interact with Spark Dataframes. SparkSession is used to create DataFrame, register DataFrames, execute SQL queries. We can access SparkSession in PySpark using spark variable. In the following examples, we retrieve SparkSession version and other informations about it.
Spark SQL which is a Spark module for structured data processing provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. In the following example, we create rdd from list then we create PySpark dataframe using SparkSession's createDataFrame method. When we look at the type of dataframe, we can see pyspark.sql.dataframe as an output. Furthermore, we can use show method to print out the dataframe.
We can use external datasets in notebook to do this select the cell below. If not already open, click the 1001 data icon at the upper part of the page to open the Files subpanel. In the right part of the page, select the people.csv dataset. Click insert to code, and click Insert SparkSession DataFrame.
You can delete df_data_1.take(5) part and then copy cos.url('file_name', 'bucket_name') above it then assign cos.url('file_name', 'bucket_name') to path_people variable and comment out this variable. cos.url('file_name', 'bucket_name') is path to your file you can access the file by using this path.
You can also add 5000_points.txt dataset by applying same procedure but click insert to code then click insert credentials then write "file" and "bucket" values inside "path_5000 = cos.url('file_name', 'bucket_name')" expression and comment out path_5000.
We can create PySpark DataFrame by using SparkSession's read.csv method. To do this, we should give path of csv file as an argument to the method. Show action prints first 20 rows of DataFrame. Count action prints number of rows in DataFrame. Columns attribute prints the list of columns in DataFrame. PrintSchema action prints the types of columns in the DataFrame and tells you if there are null values in columns.
We can use select method to select some columns of DataFrame. If we give an argument to show method, it prints out rows as the number of arguments. In the following example, it prints out 10 rows. dropDuplicates method removes the duplicate rows of a DataFrame. We can use count action to see how many rows are dropped.
We can filter out the rows based on a condition by using filter transformation as in the following example.
We can group columns based on their values by using groupby transformation as in the following example.
We can rename a column in DataFrame by using withColumnRenamed transformation.
We can also use SQL queries to achieve the same things with DataFrames. Firstly, we should create temporary table by using createOrReplaceTempView method. We should give the name of temporary table as an argument to the method. Then, we can give any query we want to execute to SparkSession's sql method as an argument. Look at the following example.
The second and most common way to create RDDs is from an external data set. To do this, we can use SparkContext's textFile method. In the following example, we use 5000_points.txt dataset. To do this, we use path to dataset as an argument to textFile method.
We can also further transform the splitted RDD to create a list of integers for the two columns.
PySpark MLlib is the Apache Spark's scalable machine learning library in Python consisting of common learning algorithms and utilities. We use K-means algorithm of MLlib library to cluster data in 5000_points.txt dataset. First, we should define error method to calculate distance from every point to center of its clusters which the points belong to.
We train the model with 4 different number of clusters from 13 to 16 and then calculate the error for all of them. As you see in the output, 16 clusters give minimum error. We retrain the model with the number of cluster with the smallest error. We then use clusterCenters attribute to see the center of all clusters.
We can again use SparkSession's createDataFrame method to create DataFrame from rdd. We must convert PySpark DataFrame to Pandas DataFrame in order to visualise data. To do this, we can use toPandas method. We create another Pandas DataFrame from cluster centers list. Then, using matplotlib's scatter method, we can make plot for clusters and their centers.
This tutorial covered Big Data via PySpark (a Python package for spark programming). We explained SparkContext by using map and filter methods with Lambda functions in Python. We also created RDD from object and external files, transformations and actions on RDD and pair RDD, SparkSession, and PySpark DataFrame from RDD, and external files. Next, we used sql queries with DataFrames (by using Spark SQL module). And finally, machine learning with PySpark MLlib library.