Airflow state toggles between success and removed

816 Views Asked by At

I have my airflow dag, the tasks are constantly toggling between success and removed and vice versa.

I am not sure why the task state is going from success to removed state.

My dag code is:

from airflow import DAG
import datetime
from datetime import timedelta
from tasks.user_space_tables_refresh import UserSpaceTablesRefresh

default_args = {
    'owner': 'Data Engineering',
    'depends_on_past': False,
    'start_date': datetime.datetime(2022, 2, 1),
}

user_space_dag = DAG(
    'user_space_snowflake_tables_refresh', default_args=default_args,
    schedule_interval="30 18 * * *", catchup=False)

with user_space_dag:
    users_task = UserSpaceTablesRefresh(
        task_id='ingest_USERS_data',
        source_table='USERS')

    saved_software_task = UserSpaceTablesRefresh(
        task_id='ingest_SAVED_SOFTWARE_data',
        source_table='SAVED_SOFTWARE')

tasks.user_space_tables_refresh file:

class UserSpaceTablesRefresh(BaseOperator):
    @apply_defaults
    def __init__(self, source_table, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.table = source_table

    def execute(self, context):
        try:
            sf_table = self.table
            ...

        except Exception as ex:
            print("Exception")

1

There are 1 best solutions below

0
Hussein Awala On

The task is marked as removed when it disappears from the DAG since the run started.

This can happen when you create tasks dynamically:

  • you have a condition that has changed since the start of the run
  • you are using a remote configuration file that is inaccessible during the next dag files processing

If the run history is not important for, try to remove the dag completely from the Metastore, and let airflow dag file processor re-create it, maybe it's a problem with different version of serialized dags or a problem which appear after an Airflow upgrade.

airflow dags delete <dag_id>

This command will delete the dag from the Metastore and all the dag runs and tasks information.