Retrieve and pass the result of an Airflow SSHOperator task to another task?

4.5k Views Asked by At

I need to retrieve the output of a bash command (which will be the size of a file), in a SSHOperator. I will use this value as a condition check to branch out to other tasks. I'm using xcom to try retrieving the value and branchpythonoperator to handle the decision but I've been quite unsuccessful. The SSHOperator doesn't seem to get value into the xcom.

The following is my code:

#Required packages to execute DAG

from __future__ import print_function
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

log = logging.getLogger(__name__)

def decision_function(**context):
    ti = context['ti']
    fileSize = ti.xcom_pull(task_ids='get_param')
    log.info('File size is: {}'.format(fileSize))
    if fileSize >= 800000:
        return 'good_path'
    else:
        return 'bad_path'

# DAG parameters

default_args = {
    'owner': 'Me',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 17, 4, 15),
    'dagrun_timeout': None,
    'email': Variable.get('email'),
    'email_on_failure': False,
    'email_on_retry': False,
    'provide_context': True,
    'orientation': 'LR'#, TB, RL, BT)

}

# create DAG object with Name and default_args
with DAG(
    'a_param',
    schedule_interval=None,
    description='params',
    default_args=default_args
    ) as dag:

    # Define tasks
    begin = DummyOperator(
        task_id='begin',
        dag=dag
    )

    get_param = SSHOperator(
    ssh_conn_id="oci_connection",
    task_id='get_param',
    xcom_push=True,
    command="ls -ltr /tmp/adobegc.log | awk '{print $5}'",
    dag=dag)

    check_file = BranchPythonOperator(
    task_id='check_file',
    python_callable=decision_function,
    provide_context=True,
    dag=dag)

    good_path = DummyOperator(
        task_id='good_path',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    bad_path = DummyOperator(
        task_id='bad_path',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    begin >> get_param >> check_file
    check_file >> good_path
    check_file >> bad_path

check_fail task fails with the following log:

[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file /usr/local/lib/airflow/airflow/contrib/operators/ssh_operator.py:75: PendingDeprecationWarning: Invalid arguments were passed to SSHOperator (task_id: get_param). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file *args: ()
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file **kwargs: {'xcom_push': True}
[2020-10-12 16:58:20,573] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file   super(SSHOperator, self).__init__(*args, **kwargs)
[2020-10-12 16:58:20,906] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file Running <TaskInstance: a_param.check_file 2020-10-12T16:55:18.312240+00:00 [running]> on host airflow-worker-9b6fbd84c-l4jbs
[2020-10-12 16:58:20,990] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file [2020-10-12 16:58:20,989] {a_param.py:25} INFO - File size is: None
[2020-10-12 16:58:20,990] {taskinstance.py:1135} ERROR - '>=' not supported between instances of 'NoneType' and 'int'
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 141, in execut
    branch = super(BranchPythonOperator, self).execute(context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execut
    return_value = self.execute_callable(
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callabl
    return self.python_callable(*self.op_args, **self.op_kwargs
  File "/home/airflow/gcs/dags/a_param.py", line 26, in decision_functio
    if fileSize >= 800000
TypeError: '>=' not supported between instances of 'NoneType' and 'int

get_param task log snippet

[2020-10-12 16:57:12,113] {base_task_runner.py:113} INFO - Job 111303: Subtask get_param [2020-10-12 16:57:12,112] {ssh_operator.py:109} INFO - Running command: ls -ltr /tmp/adobegc.log | awk '{print $5}'
[2020-10-12 16:57:12,549] {base_task_runner.py:113} INFO - Job 111303: Subtask get_param [2020-10-12 16:57:12,547] {ssh_operator.py:143} INFO - 516752
0

There are 0 best solutions below