spark-daria
Spark helper methods to maximize developer productivity.
Setup
-
Add the sbt-spark-package plugin to your application. The spark-daria releases are maintained in Spark Packages.
-
Update your build.sbt file:
spDependencies += "mrpowers/spark-daria:0.5.0"
-
Import the spark-daria code into your project, for example:
import com.github.mrpowers.spark.daria.sql.SparkSessionExt._
✅ DataFrameValidator
Custom transformations often make assumptions about the presence or absence of columns in the DataFrame. It's important to document these dependencies in the code and provide users with descriptive error messages if assumptions are not met.
Here's an example of a custom transformation that uses the validatePresenceOfColumns validation.
import com.github.mrpowers.spark.daria.sql.DataFrameValidator
object MyTransformations extends DataFrameValidator {
def withStandardizedPersonInfo(df: DataFrame): DataFrame = {
val requiredColNames = Seq("name", "age")
validatePresenceOfColumns(df, requiredColNames)
// some transformation code
}
}
validatePresenceOfColumns()
Validates if columns are included in a DataFrame. This code will error out:
val sourceDF = Seq(
("jets", "football"),
("nacional", "soccer")
).toDF("team", "sport")
val requiredColNames = Seq("team", "sport", "country", "city")
validatePresenceOfColumns(sourceDF, requiredColNames)
This is the error message:
com.github.mrpowers.spark.daria.sql.MissingDataFrameColumnsException: The [country, city] columns are not included in the DataFrame with the following columns [team, sport]
validateSchema()
Validates the schema of a DataFrame. This code will error out:
val sourceData = List(
Row(1, 1),
Row(-8, 8),
Row(-5, 5),
Row(null, null)
)
val sourceSchema = List(
StructField("num1", IntegerType, true),
StructField("num2", IntegerType, true)
)
val sourceDF = spark.createDataFrame(
spark.sparkContext.parallelize(sourceData),
StructType(sourceSchema)
)
val requiredSchema = StructType(
List(
StructField("num1", IntegerType, true),
StructField("num2", IntegerType, true),
StructField("name", StringType, true)
)
)
validateSchema(sourceDF, requiredSchema)
This is the error message:
com.github.mrpowers.spark.daria.sql.InvalidDataFrameSchemaException: The [StructField(name,StringType,true)] StructFields are not included in the DataFrame with the following StructFields [StructType(StructField(num1,IntegerType,true), StructField(num2,IntegerType,true))]
validateAbsenceOfColumns()
Validates columns are not included in a DataFrame. This code will error out:
val sourceDF = Seq(
("jets", "football"),
("nacional", "soccer")
).toDF("team", "sport")
val prohibitedColNames = Seq("team", "sport", "country", "city")
validateAbsenceOfColumns(sourceDF, prohibitedColNames)
This is the error message:
com.github.mrpowers.spark.daria.sql.ProhibitedDataFrameColumnsException: The [team, sport] columns are not allowed to be included in the DataFrame with the following columns [team, sport]
😍 Creating DataFrames
Spark provides two methods for creating DataFrames:
createDataFrame
is verbosetoDF
doesn't provide enough control for customizing the schema
spark-daria defined a createDF method that allows for the terse syntax of toDF
and the control of createDataFrame
.
spark.createDF(
List(
("bob", 45),
("liz", 25),
("freeman", 32)
), List(
("name", StringType, true),
("age", IntegerType, false)
)
)
The createDF
method can also be used with lists of Row
and StructField
objects.
spark.createDF(
List(
Row("bob", 45),
Row("liz", 25),
Row("freeman", 32)
), List(
StructField("name", StringType, true),
StructField("age", IntegerType, false)
)
)
🔗 Chaining UDFs and SQL functions
The ColumnExt class monkey patches the org.apache.spark.sql.Column class, so SQL functions and user defined functions can be chained (relevant blog post).
chain()
The chain method takes a org.apache.spark.sql.functions function as an argument and can be used as follows:
val wordsDf = Seq(
("Batman "),
(" CATWOMAN"),
(" pikachu ")
).toDF("word")
val actualDf = wordsDf.withColumn(
"cleaned_word",
col("word").chain(lower).chain(trim)
)
actualDf.show()
+----------+------------+
| word|cleaned_word|
+----------+------------+
| Batman | batman|
| CATWOMAN| catwoman|
| pikachu | pikachu|
+----------+------------+
chainUDF()
The chainUDF method takes the name of a user defined function as an argument and can be used as follows:
def appendZ(s: String): String = {
s"${s}Z"
}
spark.udf.register("appendZUdf", appendZ _)
def prependA(s: String): String = {
s"A${s}"
}
spark.udf.register("prependAUdf", prependA _)
val hobbiesDf = Seq(
("dance"),
("sing")
).toDF("word")
val actualDf = hobbiesDf.withColumn(
"fun",
col("word").chainUDF("appendZUdf").chainUDF("prependAUdf")
)
actualDf.show()
+-----+-------+
| word| fun|
+-----+-------+
|dance|AdanceZ|
| sing| AsingZ|
+-----+-------+
chain()
and chainUDF()
together
Using The chain and chainUDF methods can be used together as follows:
def appendZ(s: String): String = {
s"${s}Z"
}
spark.udf.register("appendZUdf", appendZ _)
val wordsDf = Seq(
("Batman "),
(" CATWOMAN"),
(" pikachu ")
).toDF("word")
val actualDf = wordsDf.withColumn(
"cleaned_word",
col("word").chain(lower).chain(trim).chainUDF("appendZUdf")
)
actualDf.show()
+----------+------------+
| word|cleaned_word|
+----------+------------+
| Batman | batmanZ|
| CATWOMAN| catwomanZ|
| pikachu | pikachuZ|
+----------+------------+
Column Extensions
isFalsy
isFalsy
returns true
if a column is null
or false
and false
otherwise.
Suppose you start with the following sourceDF
:
+------+
|is_fun|
+------+
| true|
| false|
| null|
+------+
Run the isFalsy
method:
val actualDF = sourceDF.withColumn("is_fun_falsy", col("is_fun").isFalsy)
Here are the contents of actualDF
:
+------+------------+
|is_fun|is_fun_falsy|
+------+------------+
| true| false|
| false| true|
| null| true|
+------+------------+
isFalsy
can also be used on StringType
columns.
Suppose you have the following sourceDF
DataFrame.
+-----------+
|animal_type|
+-----------+
| dog|
| cat|
| null|
+-----------+
Run the isFalsy
method:
val actualDF = sourceDF.withColumn(
"animal_type_falsy",
col("animal_type").isFalsy
)
Here are the contents of actualDF
:
+-----------+-----------------+
|animal_type|animal_type_falsy|
+-----------+-----------------+
| dog| false|
| cat| false|
| null| true|
+-----------+-----------------+
isTruthy
isTruthy
returns false
if a column is null
or false
and true
otherwise.
Suppose you start with the following sourceDF
:
+------+
|is_fun|
+------+
| true|
| false|
| null|
+------+
Run the isTruthy
method:
val actualDF = sourceDF.withColumn("is_fun_truthy", col("is_fun").isTruthy)
Here are the contents of actualDF
:
+------+-------------+
|is_fun|is_fun_truthy|
+------+-------------+
| true| true|
| false| false|
| null| false|
+------+-------------+
isNullOrBlank
The isNullOrBlank
method retuns true
if a column is null
or blank
and false
otherwise.
Suppose you start with the following sourceDF
:
+-----------+
|animal_type|
+-----------+
| dog|
| null|
| ""|
| " "|
+-----------+
Run the isNullOrBlank
method:
val actualDF = sourceDF.withColumn(
"animal_type_is_null_or_blank",
col("animal_type").isNullOrBlank
)
Here are the contents of actualDF
:
+-----------+----------------------------+
|animal_type|animal_type_is_null_or_blank|
+-----------+----------------------------+
| dog| false|
| null| true|
| ""| true|
| " "| true|
+-----------+----------------------------+
isNotIn
The isNotIn
method returns true
if a column element is not in a list and false
otherwise. It's the opposite of the isin
method.
Suppose you start with the following sourceDF
:
+-----+
|stuff|
+-----+
| dog|
|shoes|
|laces|
| null|
+-----+
Run the isNotIn
method:
val footwearRelated = Seq("laces", "shoes")
val actualDF = sourceDF.withColumn(
"is_not_footwear_related",
col("stuff").isNotIn(footwearRelated: _*)
)
Here are the contents of actualDF
:
+-----+-----------------------+
|stuff|is_not_footwear_related|
+-----+-----------------------+
| dog| true|
|shoes| false|
|laces| false|
| null| null|
+-----+-----------------------+
nullBetween
The built in between
doesn't work well when one of the bounds is undefined. nullBetween
is more useful when you have "less than or equal to" or "greater than or equal to" logic embedded in your upper and lower bounds. For example, if the lower bound is null
and the upper bound is 15
, nullBetween
will interpret that as "all values below 15".
Let's compare the between
and nullBetween
methods with a code snipped and the outputted DataFrame.
val actualDF = sourceDF.withColumn(
"between",
col("age").between(col("lower_bound"), col("upper_bound"))
).withColumn(
"nullBetween",
col("age").nullBetween(col("lower_bound"), col("upper_bound"))
)
+-----------+-----------+---+-------+-----------+
|lower_bound|upper_bound|age|between|nullBetween|
+-----------+-----------+---+-------+-----------+
| 10| 15| 11| true| true|
| 17| null| 94| null| true|
| null| 10| 5| null| true|
+-----------+-----------+---+-------+-----------+
⚡ sql.functions
Spark has a ton of SQL functions and spark-daria is meant to fill in any gaps.
For example, there is a datediff
function that calculates the number of days between two dates, but there isn't a yeardiff
function that calculates the number of years between two dates.
The com.github.mrpowers.spark.daria.sql.functions.yeardiff
function fills the gap. Let's see how it works!
Suppose we have the following testDf
+--------------------+--------------------+
| first_datetime| second_datetime|
+--------------------+--------------------+
|2016-09-10 00:00:...|2001-08-10 00:00:...|
|2016-04-18 00:00:...|2010-05-18 00:00:...|
|2016-01-10 00:00:...|2013-08-10 00:00:...|
| null| null|
+--------------------+--------------------+
We can run the yeardiff
function as follows:
import com.github.mrpowers.spark.daria.sql.functions._
val actualDf = testDf
.withColumn("num_years", yeardiff(col("first_datetime"), col("second_datetime")))
actualDf.show()
Console output:
+--------------------+--------------------+------------------+
| first_datetime| second_datetime| num_years|
+--------------------+--------------------+------------------+
|2016-09-10 00:00:...|2001-08-10 00:00:...|15.095890410958905|
|2016-04-18 00:00:...|2010-05-18 00:00:...| 5.923287671232877|
|2016-01-10 00:00:...|2013-08-10 00:00:...| 2.419178082191781|
| null| null| null|
+--------------------+--------------------+------------------+
🔱 sql.transformations
SQL transformations take a DataFrame as an argument and return a DataFrame. They are suitable arguments for the Dataset#transform
method.
It's convenient to work with DataFrames that have snake_case column names. Column names with spaces make it harder to write SQL queries.
spark-daria defines a com.github.mrpowers.spark.daria.sql.transformations.snakeCaseColumns
transformation to convert all the column names to snake_case.
import com.github.mrpowers.spark.daria.sql.transformations._
val sourceDf = Seq(
("funny", "joke")
).toDF("A b C", "de F")
val actualDf = sourceDf.transform(snakeCaseColumns)
actualDf.show()
Console output:
+-----+----+
|a_b_c|de_f|
+-----+----+
|funny|joke|
+-----+----+
👭 👬 👫 Contribution Criteria
We are actively looking for contributors to add functionality that fills in the gaps of the Spark source code.
To get started, fork the project and submit a pull request. Please write tests!
After submitting a couple of good pull requests, you'll be added as a contributor to the project.
Continued excellence will be rewarded with push access to the master branch.