~$ airflow version 2.1.2 python 3.8
I am trying to execute some basic queries on my redshift cluster using a dag but the task is failing with an exception(not shown in the logs)
import datetime
import logging
from airflow import DAG
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import sql_statements
def load_data_to_redshift(*args, **kwargs):
aws_hook = AwsHook("aws_credentials")
credentials = aws_hook.get_credentials()
redshift_hook = PostgresHook("redshift")
sql_stmt = sql_statements.COPY_ALL_data_SQL.format(
credentials.access_key,
credentials.secret_key,
)
redshift_hook.run(sql_stmt)
dag = DAG(
'exercise1',
start_date=datetime.datetime.now()
)
create_t1_table = PostgresOperator(
task_id="create_t1_table",
dag=dag,
postgres_conn_id="redshift_default",
sql=sql_statements.CREATE_t1_TABLE_SQL
)
create_t2_table = PostgresOperator(
task_id="create_t2_table",
dag=dag,
postgres_conn_id="redshift_default",
sql=sql_statements.CREATE_t2_TABLE_SQL,
)
create_t1_table >> create_t2_table
following is the exception
[2021-09-17 05:23:33,902] {base.py:69} INFO - Using connection to: id: redshift_default. Host: rdscluster.123455.us-west-2.redshift.amazonaws.com, Port: 5439, Schema: udac, Login: ***, Password: ***, extra: {}
[2021-09-17 05:23:33,903] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/providers/postgres/operators/postgres.py", line 70, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/8085/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 177, in run
with closing(self.get_conn()) as conn:
File "/home/8085/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 115, in get_conn
self.conn = psycopg2.connect(**conn_args)
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
conn = psycopg2.connect("dbname=airflow user=abc password=ubantu host=127.0.0.1 port=5432")
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2abc/__init__.py", line 124, in connect
conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")
File "/home/8085/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 124, in connect
conn = psycopg2.connect("dbname=airflow user=abc password=abc host=127.0.0.1 port=5432")
[Previous line repeated 974 more times]
RecursionError: maximum recursion depth exceeded
[2021-09-17 05:23:33,907] {taskinstance.py:1544} INFO - Marking task as FAILED. dag_id=exercise1, task_id=create_t1_table, execution_date=20210917T092331, start_date=20210917T092333, end_date=20210917T092333
[2021-09-17 05:23:33,953] {local_task_job.py:149} INFO - Task exited with return code 1
I can't tell from the logs what is going wrong here, it appears that even after providing redshift connection ID the PostgresOperator is using default Postgres connection configured while installing the Airflow webserver but I could be wrong. Any idea how do I resolve this or get more log out of airflow? (note I already tried with different airflow log levels from airflow config it didn't help either)
redshift - connection is defined properly and I can connect to redshift using another standalone python utility as well as plsql, so there is no issue with Redshift cluster.
-Thanks,
Resolved: Somehow following file was referring to the airflow postgres DB created during the Airflow installation rather than connecting to the local postgres.
Had to recreate the airflow DB from scratch to resolve the issue.