This repositories provides some util classes that helps readability of your unit/integration tests for Spark projects.
Using com.sysgears.DataFramesBuilder
you can define your DataFrames in tests with slightly more readable way:
import com.sysgears.DataFrameBuilder._
import org.apache.spark.sql.SparkSession
...
implicit val spark: SparkSession = ...
val users =
! "first_name" | "last_name" | "age" |
! "John" | "Johnson" | 17 |
! "Henry" | "Petrovich" | 18 |
! "Harry" | "Harrison" | 19 |
First row is a header - names of columns. Other rows contains data. Types of data is defined by first row.
Due to unary ! that starts row it has restriction - first column type must not be a boolean. To achieve this, just
change order of columns when first column is a Boolean
.
DataFrames class allow you to mock your DAO that provides data frames for next asserting on it. It has two params: format and path to make it closer to the Spark API.
To create/get DataFrames object:
import com.sysgears.DataFrames
...
new DataFrames().addReadableTable("jdbc", "users", users) // or
DataFrames.threadLocal.addReadableTable("jdbc", "users", users)
Now you can implement your test DAO as next:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import com.sysgears.DataFrames
trait UsersDao {
def getAll(): DataFrame
def save(dataFrame: DataFrame)
}
class TestUsersDao(dataFrames: DataFrames) extends UsersDao {
override def getAll: DataFrame = dataFrames.read("jdbc", "users")
override def save(dataFrame: DataFrame) = dataFrames.write("jdbc","users", dataFrame)
}
After that to get all written DataFrame
:
import com.sysgears.DataFrames
...
dataFrames.getWrittenTable("jdbc", "users").show() // or
DataFrames.threadLocal.getWrittenTable("jdbc", "users").show()
This feature is experimental
You can use SparkStub
class to use it in tests and without any DAO.
val dataFrames = new DataFrames()
val spark: SparkSession = SparkStub.create(dataFrames)
As first argument it takes dataFrames object (which is DataFrames.threadLocal by default). Every load
operation is
replaced by dataFrames.read("...", "...")
and every save
operation on DataSet is replaced by
dataFrames.write("...","...", ...)
.
So you can use dataFrames.addReadableTable("...", "...", ...)
to add tables to be read by spark, and
get all written data by using dataFrames.getWrittenTable("...", "...")
To convert Cucumber's DataTable to/from Spark's DataFrame you can use DataSetConverter
with converter methods.
import io.cucumber.java.en.Given
import io.cucumber.datatable.DataTable
class Steps {
private implicit val sparkSession: SparkSession = ...
@Given("spark format: {string} for table: {string} has data:")
def setDataFrame(format: String, tableOrPath: String, dataTable: DataTable): Unit = {
dataTable.asDataFrame()
}
}
After that you will be able to use it like this:
Given spark format: "jdbc" for table: "users" has data:
| first_name STRING | last_name STRING | age INT |
| John | Petrovich | 17 |
| Henry | Johnson | 18 |
| Harry | Potter | 19 |
You can enumerate default fields as Spark SQL:
dataTable.asDataFrame(defaults = "age INT")
You can omit types declaration, if you have some java/scala type that represents this DataFrame:
dataTable.asTypeCheckedDataFrame(classOf[User])
dataTable.asTypeCheckedDataFrame(classOf[User], defaults = "age")
Library already has a bunch of steps might be used to write BDD tests. Here is a list of all available given steps:
Given spark format: "jdbc" for table: "users" has data:
| first_name STRING | last_name STRING | age INT |
| John | Petrovich | 17 |
Given spark format: "jdbc" for table: "users" has data with defaults "age INT":
| first_name STRING | last_name STRING |
| John | Petrovich |
Given spark format: "jdbc" for table: "users" has data as "com.example.User":
| first_name | last_name | age |
| John | Petrovich | 17 |
Given spark format: "jdbc" for table: "users" has data with defaults "age" as "com.example.User":
| first_name | last_name |
| John | Petrovich |
Given spark format: "jdbc" for table: "users" has data with defaults as "com.example.User":
| first_name | last_name |
| John | Petrovich |
and all then
steps:
Then spark format: "jdbc" for table: "users" has data:
| first_name STRING | last_name STRING | age INT |
| John | Petrovich | 17 |
Then spark format: "jdbc" for table: "users" wrote data with defaults "age INT":
| first_name STRING | last_name STRING |
| John | Petrovich |
Then spark format: "jdbc" for table: "users" wrote data as "com.example.User":
| first_name | last_name | age |
| John | Petrovich | 17 |
Then spark format: "jdbc" for table: "users" wrote data with defaults "age" as "com.example.User":
| first_name | last_name |
| John | Petrovich |
Then spark format: "jdbc" for table: "users" wrote data with defaults as "com.example.User":
| first_name | last_name |
| John | Petrovich |
SparkSteps
is integrated with DataFrames
, so in your test job runner code you should pass it to your job method or
bind it to your DAO mock or use SparkStub
.
SparkSteps
is integrated by @Inject
from Guice, and takes 3 params:
SparkSession
- required to enable conversion between Cucumber'sDataTAale
and Spark'sDataFrame
DataFrames
- required to enable it to add available to read tables and assert on written tables@Named("cucumber.spark.datatype-packages") dataTypesPackages: Array[String]
- allows to use short class names in steps. For example, whendataTypesPackages = Array("com.example", "com")
you can write short class names:
Then spark format: "jdbc" for table: "users" wrote data as "User":
Then spark format: "jdbc" for table: "users" wrote data as "example.User":
To provide this args, lets create module for Cucumber:
import com.google.inject._
import com.google.inject.name.Named
import com.sysgears.{DataFrames, SparkStub}
import org.apache.spark.sql.SparkSession
import io.cucumber.core.backend.ObjectFactory
class TestModule extends AbstractModule {
override def configure(): Unit = {}
@Provides
@Singleton
def dataFrames() = new DataFrames() // or DataFrames.threadLocal
@Provides
@Singleton
def session(dataFrames: DataFrames): SparkSession = SparkStub.create(dataFrames) // or any other session
@Provides
@Named("cucumber.spark.datatype-packages")
def dataTypePackages(): Array[String] = Array("com.example.demo")
}
Configure Object factory:
import com.google.inject._
import io.cucumber.core.backend.ObjectFactory
import io.cucumber.guice.{CucumberModules, ScenarioScope}
class TestObjectFactory extends ObjectFactory {
private val injector = Guice.createInjector(
Stage.PRODUCTION,
CucumberModules.createScenarioModule,
new TestModule()
)
override def start(): Unit = injector.getInstance(classOf[ScenarioScope]).enterScope()
override def stop(): Unit = injector.getInstance(classOf[ScenarioScope]).exitScope()
override def getInstance[T](glueClass: Class[T]): T = injector.getInstance(glueClass)
override def addClass(glueClass: Class[_]): Boolean = true
}
/src/resources/META-INF/services/io.cucumber.core.backend.ObjectFactory
:
com.example.demo.TestObjectFactory
And eventually pass all args to Cucumber (here we are using junit integration):
import io.cucumber.junit.{Cucumber, CucumberOptions}
import org.junit.runner.RunWith
@RunWith(classOf[Cucumber])
@CucumberOptions(
objectFactory = classOf[TestObjectFactory], // object factory we created above
glue = Array("com.sysgears", "com.example.demo"), // com.sysgears contains step definitions, com.example.demo - just for example
features = Array("classpath:") // to load all features defined in the root of resources folder
)
class CucumberDemo {}