CVAT webhooks + Airflow DAG trigger

90 Views Asked by At

I am trying to configure the activation of a Python script through a sensor from an Airflow DAG that senses the JSON sended by the CVAT webhook once certain conditions are met (for example when the annotation task has finished). I deployed CVAT and Airflow with Docker in 2 separate machines with the same OS and Kernel (Operating System: Ubuntu 22.04.3 LTS, Kernel: Linux 5.15.0-88-generic, Architecture: x86-64).

I already have a working url for the CVAT Webhook, lets call that url https://direction.com/login/, so far, while trying the Ping option from the CVAT UI the response is correct (200) and I can see the produced JSON through ~$ docker logs {container id} comand. I also created an Airflow connection to https://direction.com .Moreover, on my Airflow DAG (that doesn't have any import error or anything like that) I am using an httpSensor with the corresponding http_conn_id and the endpoint /login as it should be.

Once all of this is done, the expected behaviour should be that once the DAG's http sensor reads the webhook's JSON message on the specifyied URL the script execution should be activated.

Here is the code that I have for the airflow DAG:

@dag(
default_args={
    'owner': 'OWNER',
    'start_date': datetime(2023, 1, 1),
},
schedule_interval=None, 
tags=['cvat_dag','webhook'],
catchup=False,)

def cvat_webhook_dag():

@task
def http_sensor_task():
        return HttpSensor(
            task_id='cvat_webhook_sensor',
            http_conn_id='cvat_webhook',
            endpoint='/login/',
            method='GET',
            response_check=lambda response: "CVAT_trial_task_1" in json.dumps(response.json()), 
            timeout=120,
            poke_interval=60
        )

@task
def execute_script():
    # Execute your script here
    print('execute script triggered')

@task
def restart_dag_fun():
    restart_dag = TriggerDagRunOperator(
        task_id = 'restart_dag',
        trigger_dag_id = 'cvat_webhook_sensor',
    )

# Set the task dependencies
http_sensor_task() >> execute_script() >> restart_dag_fun()

I also changed the HttpSensor so that response_check=lambda response: response.status_code==200 to force the script execution with independence of the JSON content, but It didn't work neither. I checked the logs from Airflow and they don't give any meaningfull informationit just says that the execution failed.

Any suggestion would be great. I am not very familiar with Airflow so probably something is wrong but I don't know what might be the solution.

Thank you in advance for your help.

0

There are 0 best solutions below