/drunken-data-quality

Spark package for checking data quality

Primary LanguageScalaApache License 2.0Apache-2.0

Drunken Data Quality (DDQ) Logo

Join the chat at https://gitter.im/FRosner/drunken-data-quality Build Status Codacy Badge codecov.io

Description

DDQ is a small library for checking constraints on Spark data structures. It can be used to assure a certain data quality, especially when continuous imports happen.

Getting DDQ

Spark Package

DDQ is available as a spark package. You can add it to your spark-shell, spark-submit or pyspark using the --packages command line option:

spark-shell --packages FRosner:drunken-data-quality:3.2.1-s_2.10

Python API

DDQ also comes with a Python API. It is available via the Python Package Index, so you have to install it once using pip:

pip install pyddq==3.2.1

Project Dependency Latest Release

In order to use DDQ in your project, you can add it as a library dependency. This can be done through the SBT spark package plugin, or you can add it using JitPack.io.

If neither of the above-mentioned ways work for you, feel free to download one of the compiled artifacts in the release section. Alternatively you may of course also build from source.

Using DDQ

Getting Started

Create two example tables to play with (or use your existing ones).

import org.apache.spark.sql._
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

case class Customer(id: Int, name: String)
case class Contract(id: Int, customerId: Int, duration: Int)

val customers = sc.parallelize(List(
  Customer(0, "Frank"),
  Customer(1, "Alex"),
  Customer(2, "Slavo")
)).toDF

val contracts = sc.parallelize(List(
  Contract(0, 0, 5),
  Contract(1, 0, 10),
  Contract(0, 1, 6)
)).toDF

Run some checks and see the results on the console.

import de.frosner.ddq.core._

Check(customers)
  .hasNumRows(_ >= 3)
  .hasUniqueKey("id")
  .run()

Check(contracts)
  .hasNumRows(_ > 0)
  .hasUniqueKey("id", "customerId")
  .satisfies("duration > 0")
  .hasForeignKey(customers, "customerId" -> "id")
  .run()

Custom Reporters

By default the check result will be printed to stdout using ANSI escape codes to highlight the output. To have a report in another format, you can specify one or more custom reporters.

import de.frosner.ddq.reporters.MarkdownReporter

Check(customers)
  .hasNumRows(_ >= 3)
  .hasUniqueKey("id")
  .run(MarkdownReporter(System.err))

Running multiple checks

You can use a runner to generate reports for multiple checks at once. It will execute all the checks and report the results to the specified reporters.

import de.frosner.ddq.reporters.ConsoleReporter
import java.io.{PrintStream, File}

val check1 = Check(customers)
  .hasNumRows(_ >= 3)
  .hasUniqueKey("id")

val check2 = Check(contracts)
  .hasNumRows(_ > 0)
  .hasUniqueKey("id", "customerId")
  .satisfies("duration > 0")
  .hasForeignKey(customers, "customerId" -> "id")

val consoleReporter = new ConsoleReporter(System.out)
val markdownMd = new PrintStream(new File("report.md"))
val markdownReporter = new MarkdownReporter(markdownMd)

Runner.run(Seq(check1, check2), Seq(consoleReporter, markdownReporter))

markdownMd.close()

Unit Tests

You can also use DDQ to write automated quality tests for your data. After running a check or a series of checks, you can inspect the results programmatically.

def allConstraintsSatisfied(checkResult: CheckResult): Boolean =
  checkResult.constraintResults.map {
    case (constraint, ConstraintSuccess(_)) => true
    case (constraint, ConstraintFailure(_)) => false
  }.reduce(_ && _)

val results = Runner.run(Seq(check1, check2), Seq.empty)
assert(allConstraintsSatisfied(results(check1)))
assert(allConstraintsSatisfied(results(check2)))

If you want to fail the data load if the number of rows and the unique key constraints are not satisfied, but the duration constraint can be violated, you can write individual assertions for each constraint result.

val numRowsConstraint = Check.hasNumRows(_ >= 3)
val uniqueKeyConstraint = Check.hasUniqueKey("id", "customerId")
val durationConstraint = Check.satisfies("duration > 0")

val check = Check(contracts)
  .addConstraint(numRowsConstraint)
  .addConstraint(uniqueKeyConstraint)
  .addConstraint(durationConstraint)

val results = Runner.run(Seq(check), Seq.empty)
val constraintResults = results(check).constraintResults
assert(constraintResults(numRowsConstraint).isInstanceOf[ConstraintSuccess])
assert(constraintResults(uniqueKeyConstraint).isInstanceOf[ConstraintSuccess])

Python API

In order to use the Python API, you have to start PySpark with the DDQ jar added. Unfortunately, using the --packages way is not working in Spark < 2.0.

pyspark --driver-class-path drunken-data-quality_2.10-x.y.z.jar

Then you can create a dummy dataframe and run a few checks.

from pyddq.core import Check

df = sqlContext.createDataFrame([(1, "a"), (1, None), (3, "c")])
check = Check(df)
check.hasUniqueKey("_1", "_2").isNeverNull("_1").run()

Just as the Scala version of DDQ, PyDDQ supports multiple reporters. In order to facilitate them, you can use pyddq.streams, which wraps the Java streams.

from pyddq.reporters import MarkdownReporter, ConsoleReporter
from pyddq.streams import FileOutputStream, ByteArrayOutputStream
import sys

# send the report in a console-friendly format the standard output
# and in markdown format to the bytearray
stdout_stream = FileOutputStream(sys.stdout)
bytearray_stream = ByteArrayOutputStream()

Check(df)\
    .hasUniqueKey("_1", "_2")\
    .isNeverNull("_1")\
    .run([MarkdownReporter(bytearray_stream), ConsoleReporter(stdout_stream)])

# print markdown report
print bytearray_stream.get_output()

Documentation

For a comprehensive list of available constraints, please refer to the Wiki.

Authors

License

This project is licensed under the Apache License Version 2.0. For details please see the file called LICENSE.