I am using airflow version 2.7.1 I am trying to copy a set of files from a remote host to local machine. I am trying to connect to remote host through SSH operator and copy files through SFTPOperator but I keep getting this raise AirflowException(f"Error while transferring {file_msg}, error: {e}")

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.sftp.operators.sftp import SFTPOperator
with DAG(
    dag_id='task_flow_v1',
    default_args=default_args,
    description='This is samplefile dag',
    start_date=datetime(2023,10,9),
    schedule_interval=None,
    tags=["pratice"]
) as dag:
       remote_directory = 'C:\\Users\\xxxx\\Downloads\\"Crystal Report Sample Files"\\"Crystal Report Sample Files"'  
    local_directory='/mnt/d/New_folder'


     sftp_task = SFTPOperator(
        task_id='sftp_task',
        ssh_conn_id='demo_windows_sshconn',  # Specify your SSH connection ID
        local_filepath=local_directory,
        remote_filepath=remote_directory,
        operation="get",  # Use 'get' for downloading from the remote server
        create_intermediate_dirs=True,
    )
sftp_task

I tried using BashOperator and scp to copy

copy_files_task = SSHOperator(
        task_id='copy_files_from_remote',
        ssh_conn_id='demo_windows_sshconn',
        command=f'scp -r [email protected]:"{remote_directory}"   "{local_directory}"',
    )

But it doesn't give results. Any idea How to copy a set of files from a remote host to local machine in airflow. Thanks in advance

1

There are 1 best solutions below

0
Matthias Huschle On

Your first example uses SFTPOperator which does not copy full directories, just individual files or lists of files, AFAIK. Your second example executes scp on the remote machine, which cannot work.

My suggestion would be to create your own Operator based on SFTPOperator, that takes directories instead of local_filepath and remote_filepath. The method you need to override is SFTOperator.execute (like https://github.com/apache/airflow/blob/b8c416681c529aad3ef744c193f6e0435c4d0d93/airflow/providers/sftp/operators/sftp.py#L109). What kind of object is used for SFTP operations depends on your exact airflow version, but it should provide methods for listing remote directories, e.g. https://github.com/apache/airflow/blob/b8c416681c529aad3ef744c193f6e0435c4d0d93/airflow/providers/sftp/hooks/sftp.py#L34 .