Get dag_ids from airflow on which task is running

21 Views Asked by At

I want to clear all task instances running at any given point in airflow. However my code fails when task is not running

def get_active_dag(hostname, username, password):
    dag_ids = []
    try:
        payload = {"limit": 100, "offset": 0, "only_active": True, "paused": False}
        response = requests.get("http://{hostname}/api/v1/dags".format(hostname=hostname), params=payload, auth=(username, password))
        if response.status_code == 200:
            out = json.loads(response.text)
            dag_count = out["total_entries"]
            for i in range(dag_count):
                 dag_ids.append(out["dags"][i]["dag_id"])
    except requests.exceptions.ConnectionError as e:
        logging.error("Failed to connect to HOST\n{}".format(hostname))
    return dag_ids


def restart_tasks(hostname, username, password, dag_ids):
    for dag_id in dag_ids:
        data = {
            "dry_run": False,
            "only_running": True,
            "only_failed": False,
        }
        response = requests.post(
            "http://{hostname}/api/v1/dags/{dag_id}/clearTaskInstances"
            .format(hostname=hostname, dag_id=dag_id), json=data, auth=(username, password))
        data = response.json()
        if response.status_code == 200:
            logging.info("Cleared task for dag_id = {} & tasks={}".format(dag_id, data['task_instances']))
        else:
            logging.error("Failed \n{}".format(response.text))
            exit(1)

Is there anyway to get only dag_ids for which task is running at given point?

0

There are 0 best solutions below