awslabs/deequ

Filter or quarantine data based on row-level checks

riccardodelega opened this issue ยท 7 comments

Is there any reason why Deequ has no feature to filter or quarantine data based on the Checks that are defined in a VerificationSuite?

I understand that some Constraints, such as DistinctnessConstraint or UniquenessConstraint, are related to the whole DataFrame and thus can only be used to either discard or keep the whole batch, however some of them, such as PatternMatchConstraint or MaxLengthConstraint, could be used to filter the DataFrame and discard the rows that do not pass the row-level non-aggregated constraints.

Is there any plan of extending Deequ to add this feature or would it require massive refactoring of the code base, as this was not Deequ's intended purpose? I briefly checked the code base and it seems like it would be hard because ScanShareableAnalyzer only exposes aggregationFunctions, which is a sequence of aggregated SparkSQL Columns and e.g. the PatternMatch Analyzer does not expose the SparkSQL non-aggregated expressions that use regexp_extract.

Hi,

In recent pull requests, we have added row-level support for certain Constraints:

  1. Completness and MaxLength: #451
  2. MinLength: #465
  3. Uniqueness and related (primary key, unique value ratio, etc): #471
  4. Max, Min, and PatternMatch: #476

These pull requests also include tests that show how a VerificationSuite can be used to return a new column per check. You can see an example here:
https://github.com/awslabs/deequ/blob/master/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala#L231

We are working on adding support for more checks, and we're merging them as we complete the work required for each.

Is this what you were looking for?

I think only the first PR managed to get included in release 2.0.3-spark-3.3, which is the one I am testing, but it seems to be exactly what I was looking for! I am glad you are working towards including new checks too.
It would be nice if VerificationResult.rowLevelResultsAsDataFrame did not require recomputing the checks, but I guess doing it in a single sweep (e.g. when the suite is run) would require a lot of refactoring.

@mentekid hi, is there any plan to release the new record level features in deequ artifact? thanks

@torontoairbnb while I'm waiting for this new row-level feature to be finished and released, I packaged the JAR myself from the latest commit on master and added it as an external JAR dependency to my project.

Yes, we are working on completing the work to support eligible Deequ checks, and will make a new release with this and other features we have added since release 2.0.3

@riccardodelega

It would be nice if VerificationResult.rowLevelResultsAsDataFrame did not require recomputing the checks, but I guess doing it in a single sweep (e.g. when the suite is run) would require a lot of refactoring.

You are right, this approach keeps each analyzer as standalone, and lets them share scans wherever possible, which is how Deequ optimizes Spark while keeping classes decoupled. Not every user needs row-level features, and our benchmarking shows it is slower to provide this information, so this was the best solution we could come up with that doesn't degrade performance of what's already here.

However, the current implementation will still make the same number of passes over the data for row-level as it would for the analysis results - it adds new columns to the dataframe, but if actions are only performed after the analysis, only minimum passes are needed.

If you spot some inefficiency, however, or you think there's something we can improve, let us know or feel free to open a PR.

@mentekid We are currently using filtering bad records functionality. New column is added with true or false based on the row-level checks. But if rules has where filter condition, filter condition is ignored.

Have opened a separate issue for this: #530
Can you please guide how to fix the issue.