bash command while loop only run once within Airflow kubernetespodoperator

833 Views Asked by At

I am trying to run one Kubernetes pod job 6 times. Each time it will print a number and sleep for 5 seconds. However, it only runs once, and then it stops. Here is the full code for the dag file:

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "lab",
    "depends_on_past": False,
    "start_date": days_ago(0),
    "catchup": False,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "dag_executes_6_times",
    default_args=default_args,
    max_active_runs=1,
    concurrency=10,
)
# use a kube_config stored in s3 dags folder for now
kube_config_path = "/usr/local/airflow/dags/kube_config.yaml"

# Generate 2 tasks
tasks = ["task{}".format(i) for i in range(1, 3)]
example_dag_complete_node = DummyOperator(task_id="example_dag_complete", dag=dag)

org_dags = []
for task in tasks:

    bash_command = "echo HELLO"

    org_node = KubernetesPodOperator(
        namespace="default-airflow",
        image="bash",
        cmds=["bash", "-c"],
        arguments=[
            "bash",
            "-c",
            "i=0;while true;do echo '$i' && ((i++>5)) && break && sleep 5;done",
        ],
        labels={"foo": "bar"},
        image_pull_policy="Always",
        name=task,
        task_id=task,
        is_delete_operator_pod=False,
        get_logs=True,
        dag=dag,
        config_file=kube_config_path,
        in_cluster=False,
        cluster_context="lab",
    )

    org_node.set_downstream(example_dag_complete_node)

It will show as follows: enter image description here

Basically, I want the status to be shown as running for the whole process, not just run once. Please advise and thanks in advance!

1

There are 1 best solutions below

5
On

The loop is running 6 times, it's just not sleeping, so it finishes immediately. The problem is that in the inside of the while loop:

echo '$i' && ((i++>5)) && break && sleep 5

The sleep command is connected to the rest of the command string with &&, which means it'll only run if the rest of the command string succeeds ((i++>5)) and the break command (which actually prevents it from running anyway). You want to connect it with ; instead, so that it runs unconditionally (although the break will skip it on the last iteration). Also, since '$i' is in single-quotes, it won't be replaced by the variable value, just printed literally; you want double-quotes. Here's what the inside of the while loop should look like:

echo "$i" && ((i++>5)) && break; sleep 5

(Note: you'll have to escape those double-quotes appropriately.)