Airflow PostgresOperator :Task failed with exception while using postgres_conn_id="redshift"

1.3k Views Asked by At

~$ airflow version 2.1.2 python 3.8

I am trying to execute some basic queries on my redshift cluster using a dag but the task is failing with an exception(not shown in the logs)

import datetime
import logging

from airflow import DAG
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator

import sql_statements

  def load_data_to_redshift(*args, **kwargs):
    aws_hook = AwsHook("aws_credentials")
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook("redshift")
    sql_stmt = sql_statements.COPY_ALL_data_SQL.format(
        credentials.access_key,
        credentials.secret_key,
    )
    redshift_hook.run(sql_stmt)

    dag = DAG(
    'exercise1',
    start_date=datetime.datetime.now()
    )

create_t1_table = PostgresOperator(
    task_id="create_t1_table",
    dag=dag,
    postgres_conn_id="redshift_default",
    sql=sql_statements.CREATE_t1_TABLE_SQL
    )
    
create_t2_table = PostgresOperator(
    task_id="create_t2_table",
    dag=dag,
    postgres_conn_id="redshift_default",
    sql=sql_statements.CREATE_t2_TABLE_SQL,
)

create_t1_table >> create_t2_table

following is the exception

[2021-09-17 05:23:33,902] {base.py:69} INFO - Using connection to: id: redshift_default. Host: rdscluster.123455.us-west-2.redshift.amazonaws.com, Port: 5439, Schema: udac, Login: ***, Password: ***, extra: {}
[2021-09-17 05:23:33,903] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
   self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/providers/postgres/operators/postgres.py", line 70, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 177, in run
    with closing(self.get_conn()) as conn:
File "/home/8085/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 115, in get_conn
    self.conn = psycopg2.connect(**conn_args)
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
    conn = psycopg2.connect("dbname=airflow user=abc password=ubantu host=127.0.0.1 port=5432")
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2abc/__init__.py", line 124, in connect
    conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
    conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")
  [Previous line repeated 974 more times]
RecursionError: maximum recursion depth exceeded
[2021-09-17 05:23:33,907] {taskinstance.py:1544} INFO - Marking task as FAILED. dag_id=exercise1, task_id=create_t1_table, execution_date=20210917T092331, start_date=20210917T092333, end_date=20210917T092333
[2021-09-17 05:23:33,953] {local_task_job.py:149} INFO - Task exited with return code 1

I can't tell from the logs what is going wrong here, it appears that even after providing redshift connection ID the PostgresOperator is using default Postgres connection configured while installing the Airflow webserver but I could be wrong. Any idea how do I resolve this or get more log out of airflow? (note I already tried with different airflow log levels from airflow config it didn't help either)

redshift - connection is defined properly and I can connect to redshift using another standalone python utility as well as plsql, so there is no issue with Redshift cluster.

-Thanks,

1

There are 1 best solutions below

0
On

Resolved: Somehow following file was referring to the airflow postgres DB created during the Airflow installation rather than connecting to the local postgres.

File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
    **conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")**

Had to recreate the airflow DB from scratch to resolve the issue.