Dagster : How to test chain of assets with IO managers

89 Views Asked by At

I am new to dagster and currently trying to tests assets.

The idea is the following, I want to be sure that a chain of asset is working properly. Here is the context, I have an asset training_set that from two SourceAsset data_source1 and data_source2 creates a dataset from training. Those two SourceAsset and the training_set asset use a custom ParquetIOManager.

Then I have another asset predictions which takes the training_set asset and a predictor_model SourceAsset (using a custom XGBoostIOManager ) as inputs .

Therefore , the problem if I test the training_set and predictions assets like in the doc specified here : https://docs.dagster.io/concepts/testing :

from dagster import asset, materialize_to_memory


@asset
def data_source():
    return get_data_from_source()


@asset
def structured_data(data_source):
    return extract_structured_data(data_source)


# An example unit test using materialize_to_memory
def test_data_assets():
    result = materialize_to_memory([data_source, structured_data])

Adapted to my code it would look like this :

asset file :

data_source1 = SourceAsset(
    key="data_source1"  ,
    io_manager_key="parquet_io_manager",
)

data_source2  = SourceAsset(
    key="data_source2"  ,
    io_manager_key="parquet_io_manager",
)

xgboost_model= SourceAsset(
    key=f"xgboost_model",
    io_manager_key="xgboost_io_manager",
)

@asset(
    io_manager_key="parquet_io_manager",
    partitions_def=fifteen_minutes_partitions,
)
def training_set(
    context: OpExecutionContext,
    data_source1 : pd.DataFrame,
    data_source2  : pd.DataFrame,
) -> pd.DataFrame:
    
    return transform_into_training_set(data_source1, data_source2) 


@asset(
    io_manager_key="parquet_io_manager",
    partitions_def=fifteen_minutes_partitions,
)
def predictions(
    context: OpExecutionContext,
    training_set: pd.DataFrame,
    xgboost_model: xgb.XGBRegressor
)->pd.DataFrame:
    
    predictions = pd.DataFrame(
        xgboost_model.predict(training_set), 
        columns=TARGETS_LABELS
    )
    return predictions
    

test file :

from assets import *
def test_data_assets():
    result = materialize_to_memory([training_set, predictions])
    assert result.success

Problem is that it would returns me that the io_manager key parquet_io_manager does not exists as it is defined nowhere in the asset file (but in production, it is in a Definition object)

My question is therefore : how can I test that training_set asset and predictions assets work properly together?

I know I could create a job but sometimes I will not want to test my entire job but just two dependant assets

Have any suggestions?

0

There are 0 best solutions below