A set of utilities for effortless data quality checks built on top of deequ.
Data Quality Suite (DQS) provides a configuration based approach to running data quality checks to ensure the rules are decoupled from transformation logic.
The suite is designed to be used with data quarantining principle in mind. Meaning that the bad data is caught early and is quarantined to avoid breaking the downstream systems. This required proper protocols for handling of quarantined data by raising issues and notifying the data owners.
To support above mentioned principles, the suite provides the following features:
- Data Profiling and Data Quality Check suggestions
- Anomaly Detection
- Storage of Quality Metrics on S3 or Timestream for observability
- Decoupled per dataset Metric and Check configuration
- Runtime agnostic data quality utilities
- Python support
To store metrics the repository concept from [deequ] is used. Metrics are produced by both profiler and the analyzer.
There are two types of repositories that can be used:
- Persistent Repository: stores metrics persistently and can be used for retrieval when using features such as
anomaly detection.
FileSystemMetricsRepository
: Can store metrics in a local file system, HDFS or S3.
- Empheral Repository: stores metrics in storage that does not support their retrieval within [deequ] across runs.
InMemoryMetricsRepository
: Stores metrics in memory.TimestreamMetricsRepository
: Stores metrics in AWS Timestream.
The results include check results and check suggestion results.
These can be stored using:
FileSystemResultPersistor
: Can store results in a local file system, HDFS or S3.
The suite is configured using a YAML file that contains configuration checks for multiple datasets / datasources.
See example below:
sources:
sales:
schema:
- column: "Invoice/Item Number"
type: string
is_nullable: false
- column: "date"
type: timestamp
mask: "yyyy-MM-dd"
- column: "Volume Sold (Liters)"
type: decimal
precision: 12
scale: 2
analyzers:
- column: "Dataset"
expression: Size()
- column: "Invoice/Item Number"
expression: Completeness(@)::Uniqueness(@)
checks:
- column: "Dataset"
level: error
name: "Dataset must not be empty"
expression: .hasSize(n => n > 0, Some("Dataset must not be empty"))
- column: "Date"
level: warning
name: "Date validation"
expression: >
.hasPattern(@, "^[0-9]{4}-[0-9]{2}-[0-9]{2}$".r)
.isComplete(@)
anomaly_detection:
- column: "Dataset"
level: error
description: "Dataset size must not be anomalous"
expression: Size()
strategy: OnlineNormalStrategy(lowerDeviationFactor = Some(3.0), upperDeviationFactor = Some(3.0))
window: 604800
- Sources: Datasource configurations are grouped under the
sources
key with a unique name for each datasource used during the run. Each datasource has configuration for schema, analyzers, checks and anomaly_detection.
Schema is used to check input data against the expected schema.
- column: Column name to run the check on.
- required: Whether the column is required or not. (default: true)
- is_nullable: Whether the column can be null or not. (default: true)
- type: Type of the column. (string, integer, decimal, timestamp, expression)
- alias: Used in
postprocess
to rename the column. Not used during schema check. - For string columns:
- min_length (optional): Minimum length of the string column. (default: 0)
- max_length (optional): Maximum length of the string column. (default: Infty)
- matches (optional): Regex pattern to check the data against.. (default: None)
- For integer columns:
- min_value (optional): Minimum value of the integer column. (default: 0)
- max_value (optional): Maximum value of the integer column. (default: Infty)
- For decimal columns:
- precision: Precision of the decimal column.
- scale: Scale of the decimal column.
- For timestamp columns:
- mask: Mask to used to parse the timestamp column.
- For expression columns:
- expression: A valid deequ
RowLevelSchema
method.
- expression: A valid deequ
Analyzers are used to produce metrics that can be used for anomaly detection.
- column: Column name to run the analyzer on.
- expression: Deequ analyzer expression to run on the column. (
@
is substituted with the column name)- Use :: to chain multiple analyzers.
- enabled: Whether to run the check or not. (default: true)
- See deequ Analyzers for options.
Checks are used to validate the data quality.
- column: Column name to run the check on.
- level: Level of the check. (error, warning, info)
- name: Name of the check.
- expression: Deequ check expression to run on the column. (
@
is substituted with the column name) - enabled: Whether to run the check or not. (default: true) * See deequ Checks for options.
Anomaly detection is used to detect anomalies in the metrics produced by the analyzers. It acts as an additional check.
- column: Column name to run the check on.
- level: Level of the check. (error, warning, info)
- description: Description of the anomaly.
- expression: Deequ analyzer expression to run on the column. (
@
is substituted with the column name) - strategy: Strategy to use for anomaly detection. * See deequ Anomaly Detection Strategies for options
- window: Window (in seconds) to use for historical metric gathering. (default: infinite)
- enabled: Whether to run the check or not. (default: true)
Create the context by providing additional parameters such as result paths and repository confgiuration:
// Configure DQSuite
val dqsContext = DQSuiteContextBuilder.builder
.withConfigPath(args("config_path"))
.withResultPath("./out/results")
.withMetricsPath("./out/metrics")
.withSparkSession(spark)
.build
Select the source / source configuration you will be using for profiling:
val dsContext = dqsContext.withDataset("sales")
Check schema and data types of your data:
val schemaCheckResult = dsContext.checkSchema(dfRaw)
assert(schemaCheckResult.isValid)
Data profiling computes metrics on your data inferred using some basic rules and data schema. It emits these profiling results as well as some suggestions on checks you may want to configure.
val profilingResult = dsContext.profile(df)
logger.info(s"Profiling finished. Used ${profilingResult.numRecordsUsedForProfiling} for profiling")
Validation runs configured metrics, checks and anomaly detection against your data. Run it using:
val validationResult = dsContext.validate(df)
validationResult.status match {
case CheckStatus.Success => logger.info("Validation succeeded")
case CheckStatus.Warning => logger.warn("Validation succeeded with warnings")
case CheckStatus.Error => throw new RuntimeException("Validation failed")
}
Finally, you can apply postprocessing for quick data transformation. Following transformations are supported:
- Column renaming: (if
alias
is specified in schema config)
val dfPostProcessed = dsContext.postprocess(df)
Create the context by providing additional parameters such as result paths and repository confgiuration:
# Configure DQSuite
dqsContext = (
DQSuiteContextBuilder.builder(spark)
.withConfigPath(config_path)
.withResultPath("./out/results")
.withMetricsPath("./out/metrics")
.build()
)
Select the source / source configuration you will be using for profiling:
dsContext = dqsContext.withDataset("sales")
Check schema and data types of your data:
schemaCheckResult = dsContext.checkSchema(dfRaw)
assert schemaCheckResult.isValid
Data profiling computes metrics on your data inferred using some basic rules and data schema. It emits these profiling results as well as some suggestions on checks you may want to configure.
profilingResult = dsContext.profile(df)
print(f"Profiling finished. Used {profilingResult.numRecordsUsedForProfiling} for profiling")
Validation runs configured metrics, checks and anomaly detection against your data. Run it using:
validationResult = dsContext.validate(df)
if validationResult.status == "Error":
raise Exception("Validation failed")
elif validationResult.status == "Warning":
print("Validation succeeded with warnings")
else:
print("Validation succeeded")
Finally, you can apply postprocessing for quick data transformation. Following transformations are supported:
- Column renaming: (if
alias
is specified in schema config)
dfPostProcessed = dsContext.postprocess(df)
Download the latest release from the releases page.
- Place the
dataquality-suite-bundle_2.12-0.1.jar
anddeequ-2.0.3-spark-3.3.jar
jars in your scrips S3 bucket. - Create a new Glue job, select the Scala script and specify your script sources.
- Add following job parameters so glue loads the libs at runtime:
{
"--extra-jars": "s3://<bucket>/dataquality-suite-bundle_2.12-0.1.jar,s3://<bucket>/deequ-2.0.3-spark-3.3.jar"
}
- See
examples
folder for examples of how to use the suite.
If you want to use the dataquality suite in your scripts you need additional python dependencies.
- Place the
dataquality-suite-bundle_2.12-0.1.jar
anddeequ-2.0.3-spark-3.3.jar
jars in your scrips S3 bucket. - Place the
dqsuite-0.1.0-py3-none-any.whl
in your scrips S3 bucket. - Create a new Glue job, select the Python script and specify your script sources.
- Add following job parameters so glue loads the libs at runtime:
{
"--extra-jars": "s3://<bucket>/dataquality-suite-bundle_2.12-0.1.jar,s3://<bucket>/deequ-2.0.3-spark-3.3.jar"
"--additional-python-modules": "s3://<bucket>/dqsuite-0.1.0-py3-none-any.whl"
}
- See
python/examples
folder for examples of how to use the suite.
Run Scala examples using the following command:
spark-submit \
--master local --deploy-mode client \
--class ETLExample --name dataquality \
--jars core/target/scala-2.12/deequ-2.0.3-spark-3.3.jar,core/target/scala-2.12/dqsuite-bundle_2.12-0.2.0.jar \
examples/target/scala-2.12/examples_2.12-0.2.0.jar \
--input_file_path=examples/data/iowa_liquor_sales_demo/iowa_liquor_sales_01.csv \
--config_path=examples/data/example.yml
The results will be written to ./out/results
and metrics to ./out/metrics
.
Run Python examples using the following command:
cd python
spark-submit \
--jars ../core/target/scala-2.12/deequ-2.0.3-spark-3.3.jar,../core/target/scala-2.12/dqsuite-bundle_2.12-0.2.0.jar \
./examples/etl_example.py \
--input_file_path=../examples/data/iowa_liquor_sales_demo/iowa_liquor_sales_01.csv \
--config_path=../examples/data/example.yml
The results will be written to ./out/results
and metrics to ./out/metrics
.