I have a task that I want to execute on a schedule using airflow. I have airflow running in docker using the docker-compose.yaml provided in the airflow docker tutorial.
I build the docker image for the task with docker build -f Dockerfile -t twm_step01
My task consists of a bash script which sets up some directories to read from before calling docker run.
So the script below is called ex-my-script.sh, and it reads from another script called config.sh which provides paths to directories that should be read from/written to.
A further script (my-script.sh) is executed in the docker container as shown below. This script then executes another bash script, which then executes a final bash script which calls a software program installed in the container to write the output data of the task.
#!/bin/bash
source scripts/config.sh
in_dir=$event_image_dir
in_ext=$zip_ext
processing_graph_xml=$graph_0
out_dir=$out_step01
out_ext=$dim_ext
lvl_parallelism=$parallel_lvl
data_dir=$data_directory
docker run -it \
-v $(pwd)/write_storage:$data_directory \
twm_step01 \
bash /scripts/my-script.sh \
$in_dir \
$in_ext \
$processing_graph_xml \
$out_dir \
$out_ext \
$lvl_parallelism \
$data_dir
Here is the config.sh to make things easier to follow
parallel_lvl=4
local_directory=/opt/airflow/tasks
data_directory=/opt/airflow/tasks/write_storage
zip_ext=.zip
dim_ext=.dim
txt_ext=.txt
tif_ext=.tif
shp_ext=.shp
# step01
event_image_dir=Events_Images/2015
graph_0=/snap_graphs/snap_graph_0.xml
out_step01=step01
And here is the volumes section of the docker-compose.yaml where I have added local directories. I added the last line with docker.sock because I read this answer: How to run a docker operator task from airflow which itself runs in a docker container?
I felt that it aligned with what I was trying to do.
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/scripts:/opt/airflow/scripts
- ${AIRFLOW_PROJ_DIR:-.}/src:/opt/airflow/src
- ${AIRFLOW_PROJ_DIR:-.}/write_storage:/opt/airflow/tasks/write_storage
- ${AIRFLOW_PROJ_DIR:-.}/snap_graphs:/opt/airflow/snap_graphs
- /var/run/docker.sock:/var/run/docker.sock
My dag looks as follows:
import os
from datetime import timedelta, datetime
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator
@dag(
dag_id="SAR_flooding_demo_docker",
start_date=datetime(24, 1, 15),
schedule="@continuous",
max_active_runs=1,
catchup=False,
default_args={
"retries":0,
"retry_delay": timedelta(minutes=1)
},
description="Testing containerized demo",
tags=["Test"]
)
def demo_runner():
@task
def task_01():
#t1=BashOperator(
# task_id="Task01",
# bash_command='/opt/airflow/scripts/ex-my-script.sh ')
t1 = DockerOperator(
task_id="Task01",
image="twm_step01",
api_version='auto',
auto_remove=True,
command='echo "this is a test message shown from within the container',
docker_url='unix://var/run/docker.sock',
network_mode='bridge'
)
return
task_01()
demo_runner()
I have tried both BashOperator and DockerOperator. The DAG is scheduled fine and does not fail, but I suspect something is wrong since it completes in less than a second. I am also looking for a way to check that the data output by the task is what I expect it to be.
I am very new to airflow and docker so I am just trying anything I can think of.
I have put paths to where my bash script and input/output data are located/should be located on my local machine in the volumes: section of the docker-compose.yaml
If the task cannot execute the bash script, why doesn't it fail? If the task can execute the bash script, why does it succeed in less than a second?
Here is the log from one of the 'successful' task runs:
bdb1f78ac8d2
*** Found local files:
*** * /opt/airflow/logs/dag_id=SAR_flooding_demo_docker/run_id=scheduled__2024-03-04T10:00:06.056780+00:00/task_id=task_01/attempt=1.log
[2024-03-04, 10:00:08 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [queued]>
[2024-03-04, 10:00:08 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [queued]>
[2024-03-04, 10:00:08 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-03-04, 10:00:08 UTC] {taskinstance.py:2214} INFO - Executing <Task(_PythonDecoratedOperator): task_01> on 2024-03-04 10:00:06.056780+00:00
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:60} INFO - Started process 699 to run task
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'SAR_flooding_demo_docker', 'task_01', 'scheduled__2024-03-04T10:00:06.056780+00:00', '--job-id', '296', '--raw', '--subdir', 'DAGS_FOLDER/SAR_flooding_demonstator_dag.py', '--cfg-path', '/tmp/tmpd36mdp9m']
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:88} INFO - Job 296: Subtask task_01
[2024-03-04, 10:00:09 UTC] {task_command.py:423} INFO - Running <TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [running]> on host bdb1f78ac8d2
[2024-03-04, 10:00:09 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='SAR_flooding_demo_docker' AIRFLOW_CTX_TASK_ID='task_01' AIRFLOW_CTX_EXECUTION_DATE='2024-03-04T10:00:06.056780+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-03-04T10:00:06.056780+00:00'
[2024-03-04, 10:00:09 UTC] {python.py:202} INFO - Done. Returned value was: None
[2024-03-04, 10:00:09 UTC] {taskinstance.py:1149} INFO - Marking task as SUCCESS. dag_id=SAR_flooding_demo_docker, task_id=task_01, execution_date=20240304T100006, start_date=20240304T100008, end_date=20240304T100009
[2024-03-04, 10:00:09 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-03-04, 10:00:09 UTC] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check```
You can't run one task inside of another, inner one do not executed because Airflow scheduler/worker do not know about it.
In your case you could directly use task flow operators, e.g. docker and
Task Docker Decorator
In Airflow 2.9.0 (not released yet) it will able to use Task Bash Decorator
Or use classical operators