I am new to dagster and currently trying to tests assets.
The idea is the following, I want to be sure that a chain of asset is working properly. Here is the context, I have an asset training_set
that from two SourceAsset data_source1
and data_source2
creates a dataset from training. Those two SourceAsset and the training_set
asset use a custom ParquetIOManager.
Then I have another asset predictions
which takes the training_set
asset and a predictor_model
SourceAsset (using a custom XGBoostIOManager ) as inputs .
Therefore , the problem if I test the training_set
and predictions
assets like in the doc specified here : https://docs.dagster.io/concepts/testing :
from dagster import asset, materialize_to_memory
@asset
def data_source():
return get_data_from_source()
@asset
def structured_data(data_source):
return extract_structured_data(data_source)
# An example unit test using materialize_to_memory
def test_data_assets():
result = materialize_to_memory([data_source, structured_data])
Adapted to my code it would look like this :
asset file :
data_source1 = SourceAsset(
key="data_source1" ,
io_manager_key="parquet_io_manager",
)
data_source2 = SourceAsset(
key="data_source2" ,
io_manager_key="parquet_io_manager",
)
xgboost_model= SourceAsset(
key=f"xgboost_model",
io_manager_key="xgboost_io_manager",
)
@asset(
io_manager_key="parquet_io_manager",
partitions_def=fifteen_minutes_partitions,
)
def training_set(
context: OpExecutionContext,
data_source1 : pd.DataFrame,
data_source2 : pd.DataFrame,
) -> pd.DataFrame:
return transform_into_training_set(data_source1, data_source2)
@asset(
io_manager_key="parquet_io_manager",
partitions_def=fifteen_minutes_partitions,
)
def predictions(
context: OpExecutionContext,
training_set: pd.DataFrame,
xgboost_model: xgb.XGBRegressor
)->pd.DataFrame:
predictions = pd.DataFrame(
xgboost_model.predict(training_set),
columns=TARGETS_LABELS
)
return predictions
test file :
from assets import *
def test_data_assets():
result = materialize_to_memory([training_set, predictions])
assert result.success
Problem is that it would returns me that the io_manager key parquet_io_manager
does not exists as it is defined nowhere in the asset file (but in production, it is in a Definition object)
My question is therefore : how can I test that training_set
asset and predictions
assets work properly together?
I know I could create a job but sometimes I will not want to test my entire job but just two dependant assets
Have any suggestions?