A Scala spark-shell backend for Org-mode’s Babel.
The only way I currently use this in my workflow is by creating case classes for tests when writing new Spark jobs. First, I load the parquet file and print the schema (which is really fast). Second, I create a case class for my unit test based on the printed schema. Finally, I df.as[T].show(1)
(T is the new case class), which shows the dataframe or throws an exception when the case class doesn’t conform the production schema.
To be honest, for the time being I’m using IntelliJ to write the tests and jobs. But having the Org-mode document available to quickly inspect production data gives the development of new ETL jobs a speed boost. I’m looking forward to using this to run ad-hoc jobs on an external cluster, too. I do not recommend using this on a cluster, yet.
At this moment, it is not obvious how to connect to an external cluster. With some CLI arguments (see options), however, you can connect to any kind of cluster using the --master
argument. Additionally, I once ran a remote spark-shell through an SSH tunnel (which consequently ran spark-shell through Docker). This is possible by supplying ssh
as the program instead of spark-shell
and supplying a bunch of arguments. I will add easy access to this functionality in the future, once the project is more stable.
How does it work? Please check out the raw version of this README to see the underlying Org-mode code.
In the following example, we will first create a dataframe ourselves with two people in it. After that, we read some data from a parquet file. To end the example, we join the two dataframes.
Create a dataframe with people using a Scala case class.
case class Person(name: String, age: Int)
val people = Seq(
Person("Fred", 23),
Person("Sally", 42))
val peopleDf = spark.createDataFrame(people)
defined class Person people: Seq[Person] = List(Person(Fred,23), Person(Sally,42)) peopleDf: org.apache.spark.sql.DataFrame = [name: string, age: int]
How many people are in this dataframe?
peopleDf.count
res39: Long = 2
ob-spark-shell
recognizes Spark table outputs and returns them as Org-mode tables.
peopleDf.show
name | age |
---|---|
Fred | 23 |
Sally | 42 |
We store the location of a parquet file in the passions-parquet
variable, below.
doc/passions.parquet
In the following code block, we expose the passions-parquet
variable (as passions_parquet
) to the spark-shell and then we load the parquet file and count the results.
val passionsDf = spark.read.load(passions_parquet)
passionsDf.count
passionsDf: org.apache.spark.sql.DataFrame = [person: string, thing: string] res43: Long = 2
What is in the dataframe?
passionsDf.show
person | thing |
---|---|
Sally | Ice cream |
Fred | Pizza |
To finish this example, let’s join the two dataframes that we made so far.
import org.apache.spark.sql.functions._
val df = peopleDf.
join(passionsDf, $"name" === $"person").
select($"person", $"age", $"thing".as("likes"))
import org.apache.spark.sql.functions._ df: org.apache.spark.sql.DataFrame = [person: string, age: int ... 1 more field]
Now we have everything in one place :-)
df.show
person | age | likes |
---|---|---|
Fred | 23 | Pizza |
Sally | 42 | Ice cream |
ob-spark-shell-program
: specify the path of your spark-shell
program.
ob-spark-shell-cli-args
: add your own CLI args. Disables the progress bar by
default (spark.ui.showConsoleProgress=false
).
- Lacks var type reflection; all vars are Scala strings.
Built at Nubank.
Some code taken from: