Airflow: Find a DAG schedule Dataset URIs from DagBag

28 Views Asked by At

I am building some integration tests (in pytest) that load the Airflow (2.8.2) DagBag and confirm that all DAGs scheduled with Data-Aware Scheduling reference Datasets that are the outlets of existing DAG Tasks.

The producer side of the relationship works as expected: DagBag.dags["<dag-id>"].tasks[<n>].outlets contains the list of Datasets produced by that task (or, conveniently, an empty list if none.)

# Wrapper to walk this tree once and return a tuple of the data we need from the
# walk (Map of dag_id.task_id -> list of sources & the list of unwrapped
# strings in outputs).
@pytest.fixture(scope="session")
def dataset_producer_analysis(dagbag: DagBag) -> tuple[Dict[str, List[str]], List[str]]:
    map = {}
    missing_wrapper = []
    for dag in dagbag.dags.values():
        for task in dag.tasks:
            for outlet in task.outlets:
                full_id = f"{dag.dag_id}.{task.task_id}"
                if hasattr(outlet, "uri"):
                    outlet = outlet.uri
                else:
                    missing_wrapper.append(full_id)
                if outlet not in map:
                    map[outlet] = []
                map[outlet].append(full_id)
    return (map, missing_wrapper)


@pytest.fixture(scope="session")
def dag_dataset_producer_map(
    dataset_producer_analysis: tuple[Dict[str, List[str]], List[str]]
) -> Dict[str, List[str]]:
    return dataset_producer_analysis[0]


@pytest.fixture(scope="session")
def dag_dataset_producer_missing_wrapper(
    dataset_producer_analysis: tuple[Dict[str, List[str]], List[str]]
) -> List[str]:
    return dataset_producer_analysis[1]

But on the schedule side. dag.schedule does not exist, and the (I thought deprecated) dag.schedule_interval is only set to "Dataset".

Poking around in in the API documentation, it looks like I can get what I need from the DagModel, but I cannot figure out how to access that in this environment:

@pytest.fixture(scope="session")
def dag_dataset_schedule_map(dagbag: DagBag) -> Dict[str, List[str]]:
    map = {}
    missing_wrapper = []
    for dag in dagbag.dags.values():
        # dag.schedule does not exist, dag.schedule_interval is set to "Dataset".
        # Try loading the model?
        dm = DagModel.get_dagmodel(dag.dag_id)
        for ds in dm.schedule_datasets:
            if hasattr(ds, "uri"):
                ds = ds.uri
            else:
                missing_wrapper.append(dag.dag_id)
            if ds not in map:
                map[ds] = []
            map[ds].append(dag.dag_id)

    return (map, missing_wrapper)

This throws a sqlalchemy.orm.exc.DetachedInstanceError on the dm.schedule_datasets:

*** sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DagModel at 0x7fffd024f2e0> is not bound to a Session; lazy load operation of attribute 'schedule_dataset_references' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)

What’s interesting to me is that dagbag.sync_to_db() works, so SOME model interaction works in this environment.

0

There are 0 best solutions below