I need to retrieve the output of a bash command (which will be the size of a file), in a SSHOperator. I will use this value as a condition check to branch out to other tasks. I'm using xcom to try retrieving the value and branchpythonoperator to handle the decision but I've been quite unsuccessful. The SSHOperator doesn't seem to get value into the xcom.
The following is my code:
#Required packages to execute DAG
from __future__ import print_function
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
log = logging.getLogger(__name__)
def decision_function(**context):
ti = context['ti']
fileSize = ti.xcom_pull(task_ids='get_param')
log.info('File size is: {}'.format(fileSize))
if fileSize >= 800000:
return 'good_path'
else:
return 'bad_path'
# DAG parameters
default_args = {
'owner': 'Me',
'depends_on_past': False,
'start_date': datetime(2020, 8, 17, 4, 15),
'dagrun_timeout': None,
'email': Variable.get('email'),
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'orientation': 'LR'#, TB, RL, BT)
}
# create DAG object with Name and default_args
with DAG(
'a_param',
schedule_interval=None,
description='params',
default_args=default_args
) as dag:
# Define tasks
begin = DummyOperator(
task_id='begin',
dag=dag
)
get_param = SSHOperator(
ssh_conn_id="oci_connection",
task_id='get_param',
xcom_push=True,
command="ls -ltr /tmp/adobegc.log | awk '{print $5}'",
dag=dag)
check_file = BranchPythonOperator(
task_id='check_file',
python_callable=decision_function,
provide_context=True,
dag=dag)
good_path = DummyOperator(
task_id='good_path',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
bad_path = DummyOperator(
task_id='bad_path',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
begin >> get_param >> check_file
check_file >> good_path
check_file >> bad_path
check_fail task fails with the following log:
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file /usr/local/lib/airflow/airflow/contrib/operators/ssh_operator.py:75: PendingDeprecationWarning: Invalid arguments were passed to SSHOperator (task_id: get_param). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file *args: ()
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file **kwargs: {'xcom_push': True}
[2020-10-12 16:58:20,573] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file super(SSHOperator, self).__init__(*args, **kwargs)
[2020-10-12 16:58:20,906] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file Running <TaskInstance: a_param.check_file 2020-10-12T16:55:18.312240+00:00 [running]> on host airflow-worker-9b6fbd84c-l4jbs
[2020-10-12 16:58:20,990] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file [2020-10-12 16:58:20,989] {a_param.py:25} INFO - File size is: None
[2020-10-12 16:58:20,990] {taskinstance.py:1135} ERROR - '>=' not supported between instances of 'NoneType' and 'int'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 141, in execut
branch = super(BranchPythonOperator, self).execute(context
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execut
return_value = self.execute_callable(
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callabl
return self.python_callable(*self.op_args, **self.op_kwargs
File "/home/airflow/gcs/dags/a_param.py", line 26, in decision_functio
if fileSize >= 800000
TypeError: '>=' not supported between instances of 'NoneType' and 'int
get_param task log snippet
[2020-10-12 16:57:12,113] {base_task_runner.py:113} INFO - Job 111303: Subtask get_param [2020-10-12 16:57:12,112] {ssh_operator.py:109} INFO - Running command: ls -ltr /tmp/adobegc.log | awk '{print $5}'
[2020-10-12 16:57:12,549] {base_task_runner.py:113} INFO - Job 111303: Subtask get_param [2020-10-12 16:57:12,547] {ssh_operator.py:143} INFO - 516752