
Spark helper methods to maximize developer productivity ✨😲

Primary LanguageScalaMIT LicenseMIT


Spark helper methods to maximize developer productivity.

  1. Add the sbt-spark-package plugin to your application. The spark-daria releases are maintained in Spark Packages.

  2. Update your build.sbt file: spDependencies += "mrpowers/spark-daria:0.5.0"

  3. Import the spark-daria code into your project, for example:

import com.github.mrpowers.spark.daria.sql.SparkSessionExt._

✅ DataFrameValidator

Custom transformations often make assumptions about the presence or absence of columns in the DataFrame. It's important to document these dependencies in the code and provide users with descriptive error messages if assumptions are not met.

Here's an example of a custom transformation that uses the validatePresenceOfColumns validation.

import com.github.mrpowers.spark.daria.sql.DataFrameValidator

object MyTransformations extends DataFrameValidator {

  def withStandardizedPersonInfo(df: DataFrame): DataFrame = {
    val requiredColNames = Seq("name", "age")
    validatePresenceOfColumns(df, requiredColNames)
    // some transformation code



Validates if columns are included in a DataFrame. This code will error out:

val sourceDF = Seq(
  ("jets", "football"),
  ("nacional", "soccer")
).toDF("team", "sport")

val requiredColNames = Seq("team", "sport", "country", "city")

validatePresenceOfColumns(sourceDF, requiredColNames)

This is the error message:

com.github.mrpowers.spark.daria.sql.MissingDataFrameColumnsException: The [country, city] columns are not included in the DataFrame with the following columns [team, sport]


Validates the schema of a DataFrame. This code will error out:

val sourceData = List(
  Row(1, 1),
  Row(-8, 8),
  Row(-5, 5),
  Row(null, null)

val sourceSchema = List(
  StructField("num1", IntegerType, true),
  StructField("num2", IntegerType, true)

val sourceDF = spark.createDataFrame(

val requiredSchema = StructType(
    StructField("num1", IntegerType, true),
    StructField("num2", IntegerType, true),
    StructField("name", StringType, true)

validateSchema(sourceDF, requiredSchema)

This is the error message:

com.github.mrpowers.spark.daria.sql.InvalidDataFrameSchemaException: The [StructField(name,StringType,true)] StructFields are not included in the DataFrame with the following StructFields [StructType(StructField(num1,IntegerType,true), StructField(num2,IntegerType,true))]


Validates columns are not included in a DataFrame. This code will error out:

val sourceDF = Seq(
  ("jets", "football"),
  ("nacional", "soccer")
).toDF("team", "sport")

val prohibitedColNames = Seq("team", "sport", "country", "city")

validateAbsenceOfColumns(sourceDF, prohibitedColNames)

This is the error message:

com.github.mrpowers.spark.daria.sql.ProhibitedDataFrameColumnsException: The [team, sport] columns are not allowed to be included in the DataFrame with the following columns [team, sport]

😍 Creating DataFrames

Spark provides two methods for creating DataFrames:

  • createDataFrame is verbose
  • toDF doesn't provide enough control for customizing the schema

spark-daria defined a createDF method that allows for the terse syntax of toDF and the control of createDataFrame.

    ("bob", 45),
    ("liz", 25),
    ("freeman", 32)
  ), List(
    ("name", StringType, true),
    ("age", IntegerType, false)

The createDF method can also be used with lists of Row and StructField objects.

    Row("bob", 45),
    Row("liz", 25),
    Row("freeman", 32)
  ), List(
    StructField("name", StringType, true),
    StructField("age", IntegerType, false)

🔗 Chaining UDFs and SQL functions

The ColumnExt class monkey patches the org.apache.spark.sql.Column class, so SQL functions and user defined functions can be chained (relevant blog post).


The chain method takes a org.apache.spark.sql.functions function as an argument and can be used as follows:

val wordsDf = Seq(
  ("Batman  "),
  ("  CATWOMAN"),
  (" pikachu ")

val actualDf = wordsDf.withColumn(

|      word|cleaned_word|
|  Batman  |      batman|
|  CATWOMAN|    catwoman|
|  pikachu |     pikachu|


The chainUDF method takes the name of a user defined function as an argument and can be used as follows:

def appendZ(s: String): String = {

spark.udf.register("appendZUdf", appendZ _)

def prependA(s: String): String = {

spark.udf.register("prependAUdf", prependA _)

val hobbiesDf = Seq(

val actualDf = hobbiesDf.withColumn(

| word|    fun|
| sing| AsingZ|

Using chain() and chainUDF() together

The chain and chainUDF methods can be used together as follows:

def appendZ(s: String): String = {

spark.udf.register("appendZUdf", appendZ _)

val wordsDf = Seq(
  ("Batman  "),
  ("  CATWOMAN"),
  (" pikachu ")

val actualDf = wordsDf.withColumn(

|      word|cleaned_word|
|  Batman  |     batmanZ|
|  CATWOMAN|   catwomanZ|
|  pikachu |    pikachuZ|

Column Extensions


isFalsy returns true if a column is null or false and false otherwise.

Suppose you start with the following sourceDF:

|  true|
| false|
|  null|

Run the isFalsy method:

val actualDF = sourceDF.withColumn("is_fun_falsy", col("is_fun").isFalsy)

Here are the contents of actualDF:

|  true|       false|
| false|        true|
|  null|        true|

isFalsy can also be used on StringType columns.

Suppose you have the following sourceDF DataFrame.

|        dog|
|        cat|
|       null|

Run the isFalsy method:

val actualDF = sourceDF.withColumn(

Here are the contents of actualDF:

|        dog|            false|
|        cat|            false|
|       null|             true|


isTruthy returns false if a column is null or false and true otherwise.

Suppose you start with the following sourceDF:

|  true|
| false|
|  null|

Run the isTruthy method:

val actualDF = sourceDF.withColumn("is_fun_truthy", col("is_fun").isTruthy)

Here are the contents of actualDF:

|  true|         true|
| false|        false|
|  null|        false|


The isNullOrBlank method retuns true if a column is null or blank and false otherwise.

Suppose you start with the following sourceDF:

|        dog|
|       null|
|         ""|
|     "    "|

Run the isNullOrBlank method:

val actualDF = sourceDF.withColumn(

Here are the contents of actualDF:

|        dog|                       false|
|       null|                        true|
|         ""|                        true|
|     "    "|                        true|


The isNotIn method returns true if a column element is not in a list and false otherwise. It's the opposite of the isin method.

Suppose you start with the following sourceDF:

|  dog|
| null|

Run the isNotIn method:

val footwearRelated = Seq("laces", "shoes")

val actualDF = sourceDF.withColumn(
  col("stuff").isNotIn(footwearRelated: _*)

Here are the contents of actualDF:

|  dog|                   true|
|shoes|                  false|
|laces|                  false|
| null|                   null|


The built in between doesn't work well when one of the bounds is undefined. nullBetween is more useful when you have "less than or equal to" or "greater than or equal to" logic embedded in your upper and lower bounds. For example, if the lower bound is null and the upper bound is 15, nullBetween will interpret that as "all values below 15".

Let's compare the between and nullBetween methods with a code snipped and the outputted DataFrame.

val actualDF = sourceDF.withColumn(
  col("age").between(col("lower_bound"), col("upper_bound"))
  col("age").nullBetween(col("lower_bound"), col("upper_bound"))
|         10|         15| 11|   true|       true|
|         17|       null| 94|   null|       true|
|       null|         10|  5|   null|       true|

⚡ sql.functions

Spark has a ton of SQL functions and spark-daria is meant to fill in any gaps.

For example, there is a datediff function that calculates the number of days between two dates, but there isn't a yeardiff function that calculates the number of years between two dates.

The com.github.mrpowers.spark.daria.sql.functions.yeardiff function fills the gap. Let's see how it works!

Suppose we have the following testDf

|      first_datetime|     second_datetime|
|2016-09-10 00:00:...|2001-08-10 00:00:...|
|2016-04-18 00:00:...|2010-05-18 00:00:...|
|2016-01-10 00:00:...|2013-08-10 00:00:...|
|                null|                null|

We can run the yeardiff function as follows:

import com.github.mrpowers.spark.daria.sql.functions._

val actualDf = testDf
  .withColumn("num_years", yeardiff(col("first_datetime"), col("second_datetime")))


Console output:

|      first_datetime|     second_datetime|         num_years|
|2016-09-10 00:00:...|2001-08-10 00:00:...|15.095890410958905|
|2016-04-18 00:00:...|2010-05-18 00:00:...| 5.923287671232877|
|2016-01-10 00:00:...|2013-08-10 00:00:...| 2.419178082191781|
|                null|                null|              null|

🔱 sql.transformations

SQL transformations take a DataFrame as an argument and return a DataFrame. They are suitable arguments for the Dataset#transform method.

It's convenient to work with DataFrames that have snake_case column names. Column names with spaces make it harder to write SQL queries.

spark-daria defines a com.github.mrpowers.spark.daria.sql.transformations.snakeCaseColumns transformation to convert all the column names to snake_case.

import com.github.mrpowers.spark.daria.sql.transformations._

val sourceDf = Seq(
  ("funny", "joke")
).toDF("A b C", "de F")

val actualDf = sourceDf.transform(snakeCaseColumns)


Console output:


👭 👬 👫 Contribution Criteria

We are actively looking for contributors to add functionality that fills in the gaps of the Spark source code.

To get started, fork the project and submit a pull request. Please write tests!

After submitting a couple of good pull requests, you'll be added as a contributor to the project.

Continued excellence will be rewarded with push access to the master branch.