Situation: I have monthly snapshots that should look like this
snapshot-2021-10.parquet
snapshot-2021-11.parquet
snapshot-2021-12.parquet
snapshot-2022-01.parquet
snapshot-2022-02.parquet
In the processing, i need the last n (say: 3) before a given date. So if date is 2022-01 I would need to process 2021-11, 2021-12 and 2022-01
Imagine the processing node wrapping a function
def process(snapshots: List[pd.DataFrame]) -> pd.Dataframe:
return pd.concat(snapshots).groupby("id")["value"].sum().reset_index()
Question:
How to set up the Node, pipeline and the Data catalog entry for this? Goal is to be able to just call kedro run --pipeline processing --params yearmon:2022-01
What I considered:
- Create a manual entry for every dataset (problem: need to re-write the dates in the pipeline for every run)
- Use versioned datasets (problem: I failed to see how I could use multiple versions of the same dataset in the same run)
I think you're looking for PartitionedDataSet or IncrementalDataSet you get a dictionary of IDs to lazy
load()
methods that you can use like this.Also I'm pretty sure Spark or Dask allow you to do wildcards here i.e.
snapshot-*.parquet
.