Python Pandas: Partition size is less than overlapping window size

366 Views Asked by At

I am trying to compute the slope of a time series using Pandas with Switfer by doing this.

My code:

import os.path
from os import listdir
from os.path import isfile, join

import numpy as np
import pandas_ta
from scipy.stats import linregress

import pandas as pd
import swifter

FILE_PATH = "Data"


def get_files_ohlc(path: str):
    return [f for f in listdir(path) if isfile(join(path, f))]


def get_slope(array):
    y = np.array(array)
    x = np.arange(len(y))
    slope, intercept, r_value, p_value, std_err = linregress(x, y)
    return slope


def add_slop_indicator(ohlc: pd.DataFrame, ind: str, candles_back: int):
    ohlc[f'slope_{val}_{days}'] = ohlc[ind].swifter.rolling(window=candles_back, min_periods=candles_back).apply(
        get_slope, raw=True)


if __name__ == '__main__':
    files = get_files_ohlc(FILE_PATH)
    dicts = {}

    for file in files:
        name_pair = file.split("_")[0]
        dicts[name_pair] = pd.read_json(os.path.join(FILE_PATH, file))
        dicts[name_pair].rename({0: 'date',
                                 1: 'open',
                                 2: 'high',
                                 3: 'low',
                                 4: 'close',
                                 5: 'volume'}, axis=1, inplace=True)
        dicts[name_pair]['date'] = dicts[name_pair]['date'].values.astype(dtype='datetime64[ms]')
        for val in range(20, 100):
            dicts[name_pair].ta.ema(length=val, append=True)

        print(f"END_{name_pair}")
        for val in range(20, 100):
            for days in range(5, 100):
                add_slop_indicator(dicts[name_pair], f"EMA_{val}", days)
        print(f"DONE {name_pair}")

But after running it for a while I will get this error message and the program will stop running.

Error:

dask.multiprocessing.NotImplementedError: Partition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.

UPDATE:

I've updated my code to make it a simple workable example.

pandas_ta (is used for dicts[name_pair].ta) -> is this library which is going to allow me to use EMA (which is just an indicator)

All the code can be found in this repository that I've created for this question. If you will run the code from test.py it will output that error message that I am asking about.

Hope that this would help, otherwise please let me know what would needs to be clarified.

My traceback:

Traceback (most recent call last):
  File "/home/vlad/Crypto_15m_data/test.py", line 51, in <module>
    add_slop_indicator(dicts[name_pair], f"EMA_{val}", days)
  File "/home/vlad/Crypto_15m_data/test.py", line 27, in add_slop_indicator
    ohlc[f'slope_{val}_{days}'] = ohlc[ind].swifter.rolling(window=candles_back, min_periods=candles_back).apply(
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/swifter/swifter.py", line 521, in apply
    return self._dask_apply(func, *args, **kwds)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/swifter/swifter.py", line 562, in _dask_apply
    dd.from_pandas(self._comparison_pd, npartitions=self._npartitions)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/base.py", line 292, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/base.py", line 575, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/multiprocessing.py", line 220, in get
    result = get_async(
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/local.py", line 508, in get_async
    raise_exception(exc, tb)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/multiprocessing.py", line 110, in reraise
    raise exc
dask.multiprocessing.NotImplementedError: Partition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.

Traceback
---------
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/local.py", line 221, in execute_task
    result = _execute_task(task, data)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/dataframe/rolling.py", line 29, in overlap_chunk
    raise NotImplementedError(msg)


Process finished with exit code 1
0

There are 0 best solutions below