I want to have a single DAG to download data from an FTP, I don't need all the data in the FTP just certain files. The files get uploaded daily at certain times throughout the day and I want to retrieve these files shortly after they are available on the FTP site.
Ex FTP schedule:
/Data/US/XASE/yyyymmdd.csv #uploaded daily at 9:00 PM UTC
/Data/EU/TRWB/yyyymmdd.csv #uploaded daily at 1:00 PM UTC
...
/Data/EU/XEUR/yyyymmdd.csv #uploaded daily at 11:00 AM UTC
How can I set the scheduler in the dag so that I can copy the data from the FTP site as they are uploaded and not have a separate dag for each upload time?
I think you have three options for scheduling here.
Option 1
You run exactly at 11AM,1PM,9PM UTC with the following schedule
0 11,13,21 * * *
. Or maybe 5 mins after the full hour to add some buffer (5 11,13,21 * * *
).Option 2
You run the DAG more regularly and check if the files are available and then download them within the Task. This makes sense if there is a higher chance that the file upload is delayed.
For example
*/10 10-22 * * *
would run every 10 minutes between 10:00-22:00.Option 3
You schedule a DAG once per day (
@once
) and then work with TimeDeltaSensor. I think this option is least preferable as you have a lot of tasks just "waiting" - which can block the execution of other airflow tasks.Besides that it also depends heavily how you want to handle the download from the FTP itself.
I guess you could create a task for every file to download per day and put a task based on BranchPythonOperator in front to avoid trying to download the same file multiple times.
Or you put the whole logic into a PythonOperator including a logic that just downloads certain files based on
execution_date
.