BigQueryInsertJobOperator data_lineage doesn't work on Google Cloud Composer with tableDefinitions

121 Views Asked by At

Running in https://github.com/GoogleCloudPlatform/composer-local-dev of composer-2.5.1-airflow-2.6.3 everything works as expected. However, I don't see any data lineage log, so I belive the post execute hook is not executed.

When running on Google Cloud Composer environment without any additional providers change, I am for code

bq_load = BigQueryInsertJobOperator(
      task_id=f"bq_load",
      depends_on_past=True,
      configuration={
          "query": {
              # add row number field at bigquery load on the fly
              "query": f"SELECT ROW_NUMBER() OVER(), t.*, '{file_name}' AS file_name FROM temp_table t",
              "tableDefinitions": {
                  "temp_table": {
                      "sourceUris": [
                          f"gs://{gcs_bucket}/{gcs_input_path}/{file_name}"
                      ],
                      "autodetect": True,
                      "sourceFormat": file_format,
                      "csvOptions": {
                          "skipLeadingRows": 0,
                      },
                  }
              },
              "destinationTable": {
                  "projectId": bq_project,
                  "datasetId": bq_dataset,
                  "tableId": f"{bq_table_L0}_{company}",
              },
              "createDisposition": "CREATE_NEVER",
              "writeDisposition": "WRITE_TRUNCATE",
              "allowLargeResults": True,
              "useLegacySql": False,
          },
      },
)

getting exception:

[2024-01-09, 12:19:05 UTC] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1408, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1562, in _execute_task_with_callbacks
    self.task.post_execute(context=context, result=result)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/lineage/__init__.py", line 77, in wrapper
    ret_val = func(self, context, *args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1210, in post_execute
    post_execute_prepare_lineage(self, context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/composer/data_lineage/operators/__init__.py", line 98, in post_execute_prepare_lineage
    mixin.post_execute_prepare_lineage(task, context)  # type: ignore[attr-defined]
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/composer/data_lineage/operators/google/cloud/bigquery.py", line 68, in post_execute_prepare_lineage
    inlets = [
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/composer/data_lineage/operators/google/cloud/bigquery.py", line 71, in <listcomp>
    dataset_id=input_table["datasetId"],
KeyError: 'datasetId'

Since I can't access those closed-source lines, I believe that data lineage operator doesn't support BigQueryInsertJobOperator with external tableDefinitions https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration

Is there some workaround?

1

There are 1 best solutions below

1
Antony Wan On

I don't know if you've received your answer yet. I recently faced the same issue and I received a solution from a Google Software Engineer: all I had to do was to disable the Dataplex data lineage integration in the Environment Configuration of the Composer.