Load constraints from csv-file (amazon deequ)

1.6k Views Asked by At

I'm checking out Deequ which seems like a really nice library. I was wondering if it is possible to load constraints from a csv file or an orc-table in HDFS?

Lets say I have a table with theese types

case class Item(
  id: Long,
  productName: String,
  description: String,
  priority: String,
  numViews: Long
)

and I want to put constraints like:

val checks = Check(CheckLevel.Error, "unit testing my data")
                  .isComplete("id") // should never be NULL
                  .isUnique("id") // should not contain duplicates

But I want to load the ".isComplete("id")", ".isUnique("id")" from a csv file so the business can add the constraints and we can run te tests based on their input


val verificationResult = VerificationSuite()
  .onData(data)
  .addChecks(Seq(checks))
    .run()

I've managed to get the constraints from suggestionResult.constraintSuggestion

val allConstraints = suggestionResult.constraintSuggestions
      .flatMap { case (_, suggestions) => suggestions.map { _.constraint }}
      .toSeq

which gives a List like for example:

allConstraints = List(CompletenessConstraint(Completeness(id,None)), ComplianceConstraint(Compliance('id' has no negative values,id >= 0,None))

But it gets generated from suggestionResult.constraintSuggestions. But I want to be able to create a List like that based on the inputs from a csv file, can anyone help me?

To sum things up: Basically I just want to add:

val checks = Check(CheckLevel.Error, "unit testing my data")
.isComplete("columnName1")
.isUnique("columnName1")
.isComplete("columnName2")

dynamically based on a file where the file has for example:

columnName;isUnique;isComplete (header)
columnName1;true;true
columnName2;false;true
2

There are 2 best solutions below

7
On

It depends on how complicated you want to allow the constraints to be. In general, deequ allows you to use arbitrary scala code for the validation function of a constraint, so its difficult (and dangerous from a security perspective) to load that from a file.

I think you would have to come up with your own schema and semantics for the CSV file, at least it is not directly supported in deequ.

1
On

I chose to store the CSV in src/main/resources as it's very easy to read from there, and easy to maintain in parallel with the code being QA'ed.

def readCSV(spark: SparkSession, filename: String): DataFrame = {
  import spark.implicits._

  val inputFileStream = Try {
    this.getClass.getResourceAsStream("/" + filename)
  }
  .getOrElse(
    throw new Exception("Cannot find" + filename + "in src/main/resources")
  )

  val readlines =
    scala.io.Source.fromInputStream(inputFileStream).getLines.toList

  val csvData: Dataset[String] =
    spark.sparkContext.parallelize(readlines).toDS

  spark.read.option("header", true).option("inferSchema", true).csv(csvData)

}

This loads it as a DataFrame; this can easily be passed to code like gavincruick's example on GitHub, copied here for convenience:

//code to build verifier from DF that has a 'Constraint' column
type Verifier = DataFrame => VerificationResult

def generateVerifier(df: DataFrame, columnName: String): Try[Verifier] = {

  val constraintCheckCodes: Seq[String] = df.select(columnName).collect().map(_(0).toString).toSeq

  def checkSrcCode(checkCodeMethod: String, id: Int): String = s"""com.amazon.deequ.checks.Check(com.amazon.deequ.checks.CheckLevel.Error, "$id")$checkCodeMethod"""

  val verifierSrcCode = s"""{
                             |import com.amazon.deequ.constraints.ConstrainableDataTypes
                             |import com.amazon.deequ.{VerificationResult, VerificationSuite}
                             |import org.apache.spark.sql.DataFrame
                             |
                             |val checks = Seq(
                             |  ${constraintCheckCodes.zipWithIndex
                           .map { (checkSrcCode _).tupled }
                           .mkString(",\n  ")}
                             |)
                             |
                             |(data: DataFrame) => VerificationSuite().onData(data).addChecks(checks).run()
                             |}
    """.stripMargin.trim

  println(s"Verification function source code:\n$verifierSrcCode\n")

  compile[Verifier](verifierSrcCode)
}

/** Compiles the scala source code that, when evaluated, produces a value of type T. */
def compile[T](source: String): Try[T] =
  Try {
      val toolbox = currentMirror.mkToolBox()
      val tree = toolbox.parse(source)
      val compiledCode = toolbox.compile(tree)
      compiledCode().asInstanceOf[T]
}

//example usage...

//sample test data
val testDataDF = Seq(
      ("2020-02-12", "England", "E10000034", "Worcestershire", 1),
      ("2020-02-12", "Wales", "W11000024", "Powys", 0),
      ("2020-02-12", "Wales", null, "Unknown", 1),
      ("2020-02-12", "Canada", "MADEUP", "Ontario", 1)
  ).toDF("Date", "Country", "AreaCode", "Area", "TotalCases")

//constraints in a DF
val constraintsDF = Seq(
    (".isComplete(\"Area\")"),
    (".isComplete(\"Country\")"),
    (".isComplete(\"TotalCases\")"),
    (".isComplete(\"Date\")"),
    (".hasCompleteness(\"AreaCode\", _ >= 0.80, Some(\"It should be above 0.80!\"))"),
    (".isContainedIn(\"Country\", Array(\"England\", \"Scotland\", \"Wales\", \"Northern Ireland\"))")
  ).toDF("Constraint")

//Build Verifier from constraints DF
val verifier = generateVerifier(constraintsDF, "Constraint").get

//Run verifier against a sample DF 
val result = verifier(testDataDF)

//display results
VerificationResult.checkResultsAsDataFrame(spark, result).show()