airflow deleting xcom's being detected as zombie job

57 Views Asked by At

im new to airflow and wanted to try to create a dag that removes xcoms. This is my simple code:

@dag(
    start_date=datetime(2024, 3, 15),
    schedule_interval='@daily',
    catchup=False
)
def delete_xcom():[enter image description here](https://i.stack.imgur.com/W9LnA.png)
    @provide_session
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def cleanup_xcom(session=None):
        session.query(XCom).filter(XCom.dag_id == "delete_xcom").delete()


    cleanup_xcom()
delete_xcom()

but the task keep failing and this is the log:


[2024-03-16, 20:38:21 IST] {standard_task_runner.py:107} ERROR - Failed to execute job 203 for task cleanup_xcom ((sqlite3.OperationalError) database is locked
[SQL: INSERT INTO log (dttm, dag_id, task_id, map_index, event, execution_date, owner, owner_display_name, extra) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('2024-03-16 18:38:16.535825', 'delete_xcom', 'cleanup_xcom', -1, <TaskInstanceState.SUCCESS: 'success'>, '2024-03-16 18:38:13.928078', 'airflow', None, None)]
(Background on this error at: https://sqlalche.me/e/14/e3q8); 757)
[2024-03-16, 20:38:21 IST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-03-16, 20:38:21 IST] {taskinstance.py:3312} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-03-16, 20:38:22 IST] {scheduler_job_runner.py:1755} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/delete_xcom.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'delete_xcom', 'Task Id': 'cleanup_xcom', 'Run Id': 'manual__2024-03-16T18:38:13.928078+00:00', 'Hostname': 'b0c160b98f88'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f8db16939d0>, 'is_failure_callback': True} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)

I tried to restart the airflow container, create new dag. from what iv seen online this code should work.

1

There are 1 best solutions below

1
mujiannan On

The error was clear.

(sqlite3.OperationalError) database is locked

Don't use SQLite. Integrate your airflow with any external postgres or mysql.

Airflow Documentation: Choosing database backend

By default, Airflow uses SQLite, which is intended for development purposes only.