I am using airflow version 2.7.1 I am trying to copy a set of files from a remote host to local machine. I am trying to connect to remote host through SSH operator and copy files through SFTPOperator but I keep getting this
raise AirflowException(f"Error while transferring {file_msg}, error: {e}")
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.sftp.operators.sftp import SFTPOperator
with DAG(
dag_id='task_flow_v1',
default_args=default_args,
description='This is samplefile dag',
start_date=datetime(2023,10,9),
schedule_interval=None,
tags=["pratice"]
) as dag:
remote_directory = 'C:\\Users\\xxxx\\Downloads\\"Crystal Report Sample Files"\\"Crystal Report Sample Files"'
local_directory='/mnt/d/New_folder'
sftp_task = SFTPOperator(
task_id='sftp_task',
ssh_conn_id='demo_windows_sshconn', # Specify your SSH connection ID
local_filepath=local_directory,
remote_filepath=remote_directory,
operation="get", # Use 'get' for downloading from the remote server
create_intermediate_dirs=True,
)
sftp_task
I tried using BashOperator and scp to copy
copy_files_task = SSHOperator(
task_id='copy_files_from_remote',
ssh_conn_id='demo_windows_sshconn',
command=f'scp -r [email protected]:"{remote_directory}" "{local_directory}"',
)
But it doesn't give results. Any idea How to copy a set of files from a remote host to local machine in airflow. Thanks in advance
Your first example uses SFTPOperator which does not copy full directories, just individual files or lists of files, AFAIK. Your second example executes scp on the remote machine, which cannot work.
My suggestion would be to create your own Operator based on SFTPOperator, that takes directories instead of
local_filepathandremote_filepath. The method you need to override isSFTOperator.execute(like https://github.com/apache/airflow/blob/b8c416681c529aad3ef744c193f6e0435c4d0d93/airflow/providers/sftp/operators/sftp.py#L109). What kind of object is used for SFTP operations depends on your exact airflow version, but it should provide methods for listing remote directories, e.g. https://github.com/apache/airflow/blob/b8c416681c529aad3ef744c193f6e0435c4d0d93/airflow/providers/sftp/hooks/sftp.py#L34 .