Dask .categorize very slow

243 Views Asked by At

I am trying to use Dask in an effort to perform feature extraction on a very large dataset (feature extraction using tsfresh), however I am having trouble with very long processing times.

My data looks as follows.

pd dataframe

I have it all stored in Parquet files on my hard-drive.

To begin with I import the data into a Dask dataframe using the following code.

import dask
from dask import dataframe as dd

df = dd.read_parquet("/Users/oskar/Library/Mobile Documents/com~apple~CloudDocs/Documents/Studies/BSc Sem 7/Bachelor Project/programs/python/data/*/data.parquet")

I then initialise a Dask cluster.

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=8,
                       threads_per_worker=1, 
                       scheduler_port=8786, 
                       memory_limit='2GB')

cluster.scheduler_address

After that I start a Dask client.

from tsfresh.utilities.distribution import ClusterDaskDistributor

dask_distributor = ClusterDaskDistributor(address="127.0.0.1:8786")

dask_distributor.client

I then melt 'df'...

dfm = df.melt(id_vars=["id", "time"],
              value_vars=['FP1-F7','F7-T7','T7-P7','P7-O1','FP1-F3','F3-C3','C3-P3','P3-O1','FP2-F4','F4-C4',
                          'C4-P4','P4-O2','FP2-F8','F8-T8','T8-P8','P8-O2','FZ-CZ','CZ-PZ','T7-FT9','FT9-FT10',
                          'FT10-T8'],
              var_name="kind",
              value_name="value")

... and group it.

dfm_grouped = dfm.groupby(["id", "kind"])

I then create an instance of 'dask_feature_extraction_on_chunk'.

from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
from tsfresh.feature_extraction.settings import MinimalFCParameters

features = dask_feature_extraction_on_chunk(dfm_grouped, 
                                            column_id="id", 
                                            column_kind="kind",
                                            column_sort="time",
                                            column_value="value", 
                                            default_fc_parameters=MinimalFCParameters())

And I then finally try to categorize it. Now this is what takes absolutely forever. And I'm wondering if it is possible to speed up this process?

features = features.categorize(columns=["variable"])

After that I intend on resetting the index and pivoting the table; presumably this will take forever also.

features = features.reset_index(drop=True)

feature_table = features.pivot_table(index="id",
                                     columns="variable",
                                     values="value",
                                     aggfunc="sum")

Not to mention the actual computation..

df_features = feature_table.compute()

Again - is there any way I can set up my Dask to allow for faster computation? My computer has 16GB of memory. Thank you.

1

There are 1 best solutions below

0
Weston A. Greene On

Calling .categorize triggers a compute of the full pipeline in order to get the set of categories.

To schedule a lazy categorization, use .astype({'mycol': 'category', ...}). this won't offer some of the optimizations available when categories are known, but it will preserve the laziness of the operation.

(Credit to the comments made by @Michael_Delgado; however, I could not confirm his claim that the docs mention that .categorize will trigger a compute().)