I am building some integration tests (in pytest) that load the Airflow (2.8.2) DagBag and confirm that all DAGs scheduled with Data-Aware Scheduling reference Dataset
s that are the outlets
of existing DAG Tasks.
The producer side of the relationship works as expected: DagBag.dags["<dag-id>"].tasks[<n>].outlets
contains the list of Dataset
s produced by that task (or, conveniently, an empty list if none.)
# Wrapper to walk this tree once and return a tuple of the data we need from the
# walk (Map of dag_id.task_id -> list of sources & the list of unwrapped
# strings in outputs).
@pytest.fixture(scope="session")
def dataset_producer_analysis(dagbag: DagBag) -> tuple[Dict[str, List[str]], List[str]]:
map = {}
missing_wrapper = []
for dag in dagbag.dags.values():
for task in dag.tasks:
for outlet in task.outlets:
full_id = f"{dag.dag_id}.{task.task_id}"
if hasattr(outlet, "uri"):
outlet = outlet.uri
else:
missing_wrapper.append(full_id)
if outlet not in map:
map[outlet] = []
map[outlet].append(full_id)
return (map, missing_wrapper)
@pytest.fixture(scope="session")
def dag_dataset_producer_map(
dataset_producer_analysis: tuple[Dict[str, List[str]], List[str]]
) -> Dict[str, List[str]]:
return dataset_producer_analysis[0]
@pytest.fixture(scope="session")
def dag_dataset_producer_missing_wrapper(
dataset_producer_analysis: tuple[Dict[str, List[str]], List[str]]
) -> List[str]:
return dataset_producer_analysis[1]
But on the schedule side. dag.schedule
does not exist, and the (I thought deprecated) dag.schedule_interval
is only set to "Dataset"
.
Poking around in in the API documentation, it looks like I can get what I need from the DagModel, but I cannot figure out how to access that in this environment:
@pytest.fixture(scope="session")
def dag_dataset_schedule_map(dagbag: DagBag) -> Dict[str, List[str]]:
map = {}
missing_wrapper = []
for dag in dagbag.dags.values():
# dag.schedule does not exist, dag.schedule_interval is set to "Dataset".
# Try loading the model?
dm = DagModel.get_dagmodel(dag.dag_id)
for ds in dm.schedule_datasets:
if hasattr(ds, "uri"):
ds = ds.uri
else:
missing_wrapper.append(dag.dag_id)
if ds not in map:
map[ds] = []
map[ds].append(dag.dag_id)
return (map, missing_wrapper)
This throws a sqlalchemy.orm.exc.DetachedInstanceError
on the dm.schedule_datasets
:
*** sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DagModel at 0x7fffd024f2e0> is not bound to a Session; lazy load operation of attribute 'schedule_dataset_references' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
What’s interesting to me is that dagbag.sync_to_db()
works, so SOME model interaction works in this environment.