/ob-spark-shell

Scala spark-shell backend for Org-mode's Babel

Primary LanguageEmacs LispMIT LicenseMIT

ob-spark-shell

A Scala spark-shell backend for Org-mode’s Babel.

Background

The only way I currently use this in my workflow is by creating case classes for tests when writing new Spark jobs. First, I load the parquet file and print the schema (which is really fast). Second, I create a case class for my unit test based on the printed schema. Finally, I df.as[T].show(1) (T is the new case class), which shows the dataframe or throws an exception when the case class doesn’t conform the production schema.

To be honest, for the time being I’m using IntelliJ to write the tests and jobs. But having the Org-mode document available to quickly inspect production data gives the development of new ETL jobs a speed boost. I’m looking forward to using this to run ad-hoc jobs on an external cluster, too. I do not recommend using this on a cluster, yet.

At this moment, it is not obvious how to connect to an external cluster. With some CLI arguments (see options), however, you can connect to any kind of cluster using the --master argument. Additionally, I once ran a remote spark-shell through an SSH tunnel (which consequently ran spark-shell through Docker). This is possible by supplying ssh as the program instead of spark-shell and supplying a bunch of arguments. I will add easy access to this functionality in the future, once the project is more stable.

Example

How does it work? Please check out the raw version of this README to see the underlying Org-mode code.

In the following example, we will first create a dataframe ourselves with two people in it. After that, we read some data from a parquet file. To end the example, we join the two dataframes.

Creating our own dataframe

Create a dataframe with people using a Scala case class.

case class Person(name: String, age: Int)

val people = Seq(
  Person("Fred", 23),
  Person("Sally", 42))

val peopleDf = spark.createDataFrame(people)
defined class Person
people: Seq[Person] = List(Person(Fred,23), Person(Sally,42))
peopleDf: org.apache.spark.sql.DataFrame = [name: string, age: int]

How many people are in this dataframe?

peopleDf.count
res39: Long = 2

ob-spark-shell recognizes Spark table outputs and returns them as Org-mode tables.

peopleDf.show
nameage
Fred23
Sally42

Reading parquet file

We store the location of a parquet file in the passions-parquet variable, below.

doc/passions.parquet

In the following code block, we expose the passions-parquet variable (as passions_parquet) to the spark-shell and then we load the parquet file and count the results.

val passionsDf = spark.read.load(passions_parquet)
passionsDf.count
passionsDf: org.apache.spark.sql.DataFrame = [person: string, thing: string]
res43: Long = 2

What is in the dataframe?

passionsDf.show
personthing
SallyIce cream
FredPizza

Joining them

To finish this example, let’s join the two dataframes that we made so far.

import org.apache.spark.sql.functions._
val df = peopleDf.
  join(passionsDf, $"name" === $"person").
  select($"person", $"age", $"thing".as("likes"))

import org.apache.spark.sql.functions._
df: org.apache.spark.sql.DataFrame = [person: string, age: int ... 1 more field]

Now we have everything in one place :-)

df.show
personagelikes
Fred23Pizza
Sally42Ice cream

Options

ob-spark-shell-program: specify the path of your spark-shell program.

ob-spark-shell-cli-args: add your own CLI args. Disables the progress bar by default (spark.ui.showConsoleProgress=false).

Limitations

  • Lacks var type reflection; all vars are Scala strings.

Acknowledgements

Built at Nubank.

Some code taken from: