I am trying to push the results of SQL commands in airflow SQLHook. Even though I am able to view the command results in log but it is not pushed into xcom.
class SqlExecuteOperator(BaseOperator):
template_fields = ('sql',)
template_ext = ('.hql', '.sql',)
ui_color = '#fff7e6'
@apply_defaults
def __init__(
self, sql,
conn_id=None,
database=None,
*args, **kwargs):
super(SqlExecuteOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
self.sql = sql
self.database = database
def execute(self, **kwargs):
self.log.info('Executing SQL statement: ' + self.sql)
records = self.get_db_hook().get_first(self.sql)
self.log.info("Record: " + str(records))
return int(records[0])
def get_db_hook(self):
conn = BaseHook.get_connection(conn_id=self.conn_id)
hook = BaseHook.get_hook(conn_id=self.conn_id)
hook.connection = conn
if self.database:
hook.schema = self.database
return hook
operator that I am using is as follows:
@task
def get_results_from_sql(**kwargs):
sql_task_op = SqlExecuteOperator(
task_id="sql_task",
conn_id=SQL_CONNECTION,
sql="SELECT 1",
database='TEST',
do_xcom_push=True,
)
sql_task_op.execute(context=dict())
Please find the log details
95648bf8cf8b
*** Found local files:
*** * /opt/airflow/logs/dag_id=SQL_TEST/run_id=manual__2023-12-22T03:53:23.673472+00:00/task_id=get_results_from_sql/attempt=1.log
[2023-12-21, 22:53:24 EST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: SQL_TEST.get_results_from_sql manual__2023-12-22T03:53:23.673472+00:00 [queued]>
[2023-12-21, 22:53:24 EST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: SQL_TEST.get_results_from_sql manual__2023-12-22T03:53:23.673472+00:00 [queued]>
[2023-12-21, 22:53:24 EST] {taskinstance.py:1361} INFO - Starting attempt 1 of 2
[2023-12-21, 22:53:24 EST] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): get_results_from_sql> on 2023-12-22 03:53:23.673472+00:00
[2023-12-21, 22:53:24 EST] {standard_task_runner.py:57} INFO - Started process 605102 to run task
[2023-12-21, 22:53:24 EST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'SQL_TEST', 'get_results_from_sql', 'manual__2023-12-22T03:53:23.673472+00:00', '--job-id', '266440', '--raw', '--subdir', 'DAGS_FOLDER/sql_test.py', '--cfg-path', '/tmp/tmp0b98o6o4']
[2023-12-21, 22:53:24 EST] {standard_task_runner.py:85} INFO - Job 266440: Subtask get_results_from_sql
[2023-12-21, 22:53:24 EST] {task_command.py:416} INFO - Running <TaskInstance: SQL_TEST.get_results_from_sql manual__2023-12-22T03:53:23.673472+00:00 [running]> on host 95648bf8cf8b
[2023-12-21, 22:53:25 EST] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='SQL_TEST' AIRFLOW_CTX_TASK_ID='get_results_from_sql' AIRFLOW_CTX_EXECUTION_DATE='2023-12-22T03:53:23.673472+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-22T03:53:23.673472+00:00'
[2023-12-21, 22:53:25 EST] {sql_test.py:30} INFO - Executing SQL statement: SELECT 1
[2023-12-21, 22:53:25 EST] {base.py:73} INFO - Using connection ID 'MS_TES' for task execution.
[2023-12-21, 22:53:25 EST] {sql.py:418} INFO - Running statement: SELECT 1, parameters: None
[2023-12-21, 22:53:25 EST] {sql_test.py:32} INFO - Record: (1,)
[2023-12-21, 22:53:25 EST] {python.py:194} INFO - Done. Returned value was: None
[2023-12-21, 22:53:25 EST] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=SQL_TEST, task_id=get_results_from_sql, execution_date=20231222T035323, start_date=20231222T035324, end_date=20231222T035325
[2023-12-21, 22:53:25 EST] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-12-21, 22:53:25 EST] {taskinstance.py:2778} INFO - 1 downstream tasks scheduled from follow-on schedule check
I am not sure what I am missing. Please provide me a solution if anyone faced similar issue.
As has been mentioned already you should not be calling
execute
methods directly. Airflow calls these and uses their results to create the XCom messages.MsSqlOperator demo DAG
The simplest usage is to return the raw result sets from
MsSqlOperator
such as in thisdags/mssql_operator_xcom_dag.py
file:After executing this DAG you'll see the following XCom messages output from the
query_results
anddisplay_results
tasks:MsSqlHook demo DAG
The
MsSqlOperator
callsMsSqlHook
to do most of the work so you can restructure things slightly to get exactly the same results usingMsSqlHook
such as in thisdags/mssql_hook_xcom_dag.py
file:After executing this DAG you'll see the following XCom messages output from the
query_results
anddisplay_results
tasks:Of the two approaches
MsSqlHook
can be more useful as it gives you the opportunity to return the results in different formats, such as returning a DataFrame by usingmssql.get_pandas_df(sql=sql)
instead.Orchestrating The Orchestrator
The above DAGs can be demonstrated quickly in Docker containers, based on Apache's demo compose file from https://airflow.apache.org/docs/apache-airflow/2.8.0/docker-compose.yaml with the addition of an SQL Server 2019 container to test
MsSqlOperator
andMsSqlHook
:And a simple
up.sh
script to start the composition, install the required Airflow providers and create the Connection that's used in the above DAGs:And, finally, a
down.sh
script to tear down the Docker containers and images: