How do I invoke a Dataform workflow using assertions (Data Quality tests) Airflow via GCP Cloud Composer?

203 Views Asked by At

I need help to execute a (Data Quality) task with Dataform's assertions to UniqueKey in BigQuery using Cloud Composer's DAG. The task below executes the workflow, but I can't perform Data Quality within the task.

Thanks.

    from datetime import datetime
    
    from google.cloud.dataform_v1beta1 import WorkflowInvocation
    
    from airflow import models
    from airflow.models.baseoperator import chain
    from airflow.providers.google.cloud.operators.dataform import (
        DataformCancelWorkflowInvocationOperator,
        DataformCreateCompilationResultOperator,
        DataformCreateWorkflowInvocationOperator,
        DataformGetCompilationResultOperator,
        DataformGetWorkflowInvocationOperator,
    )
    
    DAG_ID = "dataform"
    PROJECT_ID = "PROJECT_ID"
    REPOSITORY_ID = "REPOSITORY_ID"
    REGION = "REGION"
    GIT_COMMITISH = "GIT_COMMITISH"
    
    with models.DAG(
        DAG_ID,
        schedule_interval='@once',  # Override to match your needs
        start_date=datetime(2022, 1, 1),
        catchup=False,  # Override to match your needs
        tags=['dataform'],
    ) as dag:
    
        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
            },
        )
    
        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
             workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
    },
        )
    
    create_compilation_result >> create_workflow_invocation
1

There are 1 best solutions below

3
On

The assertions would be defined in your Dataform Workflow config.

example:

  assertions: {
    uniqueKey: ["comment_id"],
    nonNull: ["comment_text"],
    rowConditions: [
      "total_words > 0"
    ]
  }