I am trying to test Airflow DAGs, so I created testing environment, Docker container with volume and connection to PostrgeSQL container.
When container runs, docker-entrypoint.sh installs all the packages, creates AIRFLOW_HOME temporary folder, links to the needed folders and files in volume and initializes airflow db.
After that current directory is changed to AIRFLOW_HOME and pytest is run as shown below:
pip install -r /tmp/requirements.txt
export AIRFLOW__CORE__LOAD_EXAMPLES=False AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
export AIRFLOW_HOME=$(mktemp -d -t airflow-XXXXXX)
mkdir -p $AIRFLOW_HOME/plugins/backend
ln -s /workspace/pipelines/airflow/dags $AIRFLOW_HOME/dags
ln -s /workspace/pipelines/airflow/plugins/backend/__init__.py $AIRFLOW_HOME/plugins/backend/__init__.py
ln -s /workspace/pipelines/airflow/plugins/backend/data_catalog $AIRFLOW_HOME/plugins/backend/data_catalog
ln -s /workspace/backend/schema_catalog $AIRFLOW_HOME/plugins/backend/schema_catalog
airflow db init
cd $AIRFLOW_HOME
pytest -s /workspace/pipelines/airflow/tests/test_dag.py
test_dag has some functions for db migration, data reloading to the tables, running the DAG and etc. Everything works perfectly fine.
Now I would like to put all the above preparation of airflow environment into the pytest fixture and leave only pytest running command in entrypoint script. I re-wrote everything into pytest fixture function as below:
@pytest.fixture
def airflow_instance(tmp_path_factory):
# Set the AIRFLOW_HOME environment variable to a temporary directory
airflow_home = tmp_path_factory.mktemp('airflow')
os.environ['AIRFLOW_HOME'] = str(airflow_home)
# Create necessary directories and symbolic links
backend_dir = airflow_home / 'plugins' / 'backend'
backend_dir.mkdir(parents=True)
dags_dir = airflow_home / 'dags'
os.symlink('/workspace/pipelines/airflow/dags', str(dags_dir))
init_py = backend_dir / '__init__.py'
os.symlink('/workspace/pipelines/airflow/plugins/backend/__init__.py', str(init_py))
data_catalog_dir = backend_dir / 'data_catalog'
os.symlink('/workspace/pipelines/airflow/plugins/backend/data_catalog', str(data_catalog_dir))
schema_catalog_dir = backend_dir / 'schema_catalog'
os.symlink('/workspace/backend/schema_catalog', str(schema_catalog_dir))
# Initialize Airflow database
os.system('airflow db init')
yield airflow_home
Everything seems similar to the previous shell script but test raises ModuleNotFoundError: No module named 'backend'.
from backend.schema_catalog.model import citext
E ModuleNotFoundError: No module named 'backend'
/workspace/backend/migrations/versions/3081eed292ae_job_model.py:12: ModuleNotFoundError
I have tried to change directory into AIRFLOW_HOME inside pytest script.
os.chdir(airflow_instance)
So the functions run from that folder same as it was done through shell script.
However, I am still getting the same error.