how to run all suggested checks in pydeequ

1.8k Views Asked by At

I have just started with pydeequ and I want to create checks for spark dataframe that has ~1800 features. Now to know which checks I must perform, I do the following

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

Following above I get suggestion for the all the checks that I could do on my data. Now the goal is 2 folds.

  1. I may want to run the checks provided by suggestionResult
  2. I may want to run a particular check for e.g. NonNegative, Unique check for a series of features.

I am completely unsure how to do it, after trying several ways, It still doesnt work, while I know its certainly possible to run all suggestion check at once but only in scala see this (I need to do this in pydeequ as per my point 1)

I did attempt to do the following way but it didnt work. gave me an error on duplicate analyzers

check_list = [check.isNonNegative,check.isPositive]
checkResultBuilder = VerificationSuite(spark).onData(df)
for col in sub_cols:
    checkResultBuilder = reduce(
    lambda vbuilder,checker: vbuilder.addCheck(checker(col)),check_list,checkResultBuilder)

checkResultBuilder.run()
2

There are 2 best solutions below

1
On

If helpful for anyone, here's a full example showing how to generate suggested data quality constraints and then check all of them.

Note, this example uses PyDeequ, which is the Python implementation of Scala's Deequ. This question specifically mentioned Deequ, but PyDeequ has a very similar suite of APIs. I built this solution partially off @mlin's solution.

First, let's create a string that is a concatenation of all the suggested constraints:

from pydeequ.suggestions import *
from pyspark.sql import SparkSession

# Reading data
df = \
(spark
 .read
 .load("path/to/data/here"))

# Getting suggested data quality constraint rules
suggested_constraints = \
(ConstraintSuggestionRunner(spark)
 .onData(df)
 .addConstraintRule(CompleteIfCompleteRule())
 .addConstraintRule(NonNegativeNumbersRule())
 .addConstraintRule(RetainCompletenessRule())
 .addConstraintRule(RetainTypeRule())
 .addConstraintRule(UniqueIfApproximatelyUniqueRule())
 .run())

# Printing suggested constraints in JSON format
print(json.dumps(suggested_constraints, indent=2))

# Printing suggested constraints in more readable and understandable format
for suggestion in suggested_constraints['constraint_suggestions']:
  print("-----")
  print(f"Suggested constraint for \'{suggestion['column_name']}\': {suggestion['description']}")
  print(f"The description for this rule is: \'{suggestion['rule_description']}\'")
  print(f"The corresponding Python code is: `{suggestion['code_for_constraint']}`")

# Creating empty string to concatenate against
pydeequ_validation_string = ""

# Building string from suggestions
for suggestion in suggested_constraints['constraint_suggestions']:
  pydeequ_validation_string = pydeequ_validation_string + suggestion["code_for_constraint"]
  
# Printing string validation string
# If desired, edit this string to control what data quality validations are performed
print(pydeequ_validation_string)

At this point, our pydeequ_validation_string might look like:

.isComplete("column_1").isComplete("column_2").isComplete("tcolumn_3").isNonNegative("column_1")

Now, let's take our pydeequ_validation_string and use it to check all these constraints at once. Here's a function to do this. Note, I'm first concatenating our string with "check", and then using Python's eval to evaluate this string as code.

import logging

from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes
from pydeequ.verification import VerificationResult, VerificationSuite
from pyspark.sql import functions as F, SparkSession

logger = logging.getLogger(__name__)

def perform_checks(df, pydeequ_validation_string):
    """Perform and log data quality checks."""

    # Initializing
    check = \
        Check(spark_session=spark,
              level=CheckLevel.Warning,
              description="Data Quality Check")

    # Building validation string of constraints to check
    pydeequ_validation_string_to_check = "check" + pydeequ_validation_string

    # Checking constraints
    checked_constraints = \
        (VerificationSuite(spark)
         .onData(df)
         .addCheck(eval(pydeequ_validation_string_to_check))
         .run())

    # Returning results as DataFrame
    df_checked_constraints = \
        (VerificationResult
         .checkResultsAsDataFrame(spark, checked_constraints))

    logger.info(
        df_checked_constraints.show(n=df_checked_constraints.count(),
                                    truncate=False)
    )

    # Filtering for any failed data quality constraints
    df_checked_constraints_failures = \
        (df_checked_constraints
         .filter(F.col("constraint_status") == "Failure"))

    # If any data quality check fails, raise exception
    if df_checked_constraints_failures.count() > 0:
        logger.info(
            df_checked_constraints_failures.show(n=df_checked_constraints_failures.count(),
                                                 truncate=False)
0
On

You can use the method listed here: https://github.com/awslabs/python-deequ/issues/23, then pass the arguments as a list called args, and unpack it as *args.

The Constraint Suggestion Runner returns a dictionary with the constraints in the constraint_suggestions key which you can further unpack with a little work reading further inside the dictionary.

Use eval(str) to turn the string form of the extra parameters into the proper objects and get(attr) to add the constraint given the name.

for item in parameters:
   args.append(eval(str(item)))

check.addConstraint(getattr(check, constraint)(*args))