I am trying to run one Kubernetes pod job 6 times. Each time it will print a number and sleep for 5 seconds. However, it only runs once, and then it stops. Here is the full code for the dag file:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "lab",
"depends_on_past": False,
"start_date": days_ago(0),
"catchup": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"dag_executes_6_times",
default_args=default_args,
max_active_runs=1,
concurrency=10,
)
# use a kube_config stored in s3 dags folder for now
kube_config_path = "/usr/local/airflow/dags/kube_config.yaml"
# Generate 2 tasks
tasks = ["task{}".format(i) for i in range(1, 3)]
example_dag_complete_node = DummyOperator(task_id="example_dag_complete", dag=dag)
org_dags = []
for task in tasks:
bash_command = "echo HELLO"
org_node = KubernetesPodOperator(
namespace="default-airflow",
image="bash",
cmds=["bash", "-c"],
arguments=[
"bash",
"-c",
"i=0;while true;do echo '$i' && ((i++>5)) && break && sleep 5;done",
],
labels={"foo": "bar"},
image_pull_policy="Always",
name=task,
task_id=task,
is_delete_operator_pod=False,
get_logs=True,
dag=dag,
config_file=kube_config_path,
in_cluster=False,
cluster_context="lab",
)
org_node.set_downstream(example_dag_complete_node)
Basically, I want the status to be shown as running for the whole process, not just run once. Please advise and thanks in advance!
The loop is running 6 times, it's just not sleeping, so it finishes immediately. The problem is that in the inside of the
while
loop:The
sleep
command is connected to the rest of the command string with&&
, which means it'll only run if the rest of the command string succeeds((i++>5))
and thebreak
command (which actually prevents it from running anyway). You want to connect it with;
instead, so that it runs unconditionally (although thebreak
will skip it on the last iteration). Also, since'$i'
is in single-quotes, it won't be replaced by the variable value, just printed literally; you want double-quotes. Here's what the inside of thewhile
loop should look like:(Note: you'll have to escape those double-quotes appropriately.)