I'm trying to use an SqlSensor task in my DAA but I'm getting an 'unknown hook type "postgres" error. I want to be able to revisit the db table to check if new rows were added within the last daily run.
I'm running Airflow 2.8.1 on Ubuntu and have installed the apache-airflow-providers-postgres and apache-airflow-providers-common-sql. I'm trying to run an SqlSensor to check that new rows have been added to a table:
sql_sensor = SqlSensor(
task_id="sql_sensor",
conn_id="pg_conn",
success=_success_criteria,
sql="SELECT COUNT(*) FROM public.intradaytrades WHERE timestamp > CURRENT_DATE - INTERVAL '1 day';",
mode="reschedule",
fail_on_empty=True,
timeout=60 * 60, # 1 hour timeout for the sensor
poke_interval=60 * 5, # 5 minutes between pokes
)
The pg_conn is setup, and tested, with 'Postgres' type in Airflow connections.
However I'm getting this error when running that sensor task:
[2024-02-04, 01:10:25 UTC] {base.py:83} INFO - Using connection ID 'pg_conn' for task execution.
[2024-02-04, 01:10:25 UTC] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/sensors/base.py", line 265, in execute
raise e
File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/sensors/base.py", line 247, in execute
poke_return = self.poke(context)
^^^^^^^^^^^^^^^^^^
File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/providers/common/sql/sensors/sql.py", line 93, in poke
hook = self._get_hook()
^^^^^^^^^^^^^^^^
File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/providers/common/sql/sensors/sql.py", line 84, in _get_hook
hook = conn.get_hook(hook_params=self.hook_params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/models/connection.py", line 363, in get_hook
raise AirflowException(f'Unknown hook type "{self.conn_type}"')
airflow.exceptions.AirflowException: Unknown hook type "postgres"
What could be the reason for this fail? I've tested the pg_conn by using the PostgresHook directly with a custom query task and can fetachall rows from the db.
apache-airflow==2.8.1
apache-airflow-providers-celery==3.5.2
apache-airflow-providers-common-io==1.2.0
apache-airflow-providers-common-sql==1.10.1
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-github==2.5.1
apache-airflow-providers-http==4.8.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-winrm==3.4.0
apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-redis==3.6.0
apache-airflow-providers-slack==8.6.0
apache-airflow-providers-sqlite==3.7.0
apache-airflow-providers-ssh==3.10.0