compute() command doesnt work on dask series in python

71 Views Asked by At

I'm trying to compute pairwise rations on a large scale data where each column is a separated sample like this (this is a small example):

           0         1         2
0      34.04     56.55     49.65
1      35.86     49.28     42.36
2      11.21     33.96     18.11
3      12.40     23.17     16.28
4    1087.51     93.37    166.75
5     494.39    182.15    893.23
6   11018.44   6044.63  17347.33
7      20.48     38.01     92.82
8   14866.34   9034.35  19625.89
9   21932.70  12289.75  43752.48
10   8561.40   3279.36   8717.61
11   1050.82   1302.27   1951.25
12    978.03    202.63    179.67
13     15.22     28.82     22.21
14     42.94     40.77     84.22
15    231.05     66.02    220.13
16     69.01     57.45     85.20
17    309.21     88.90   1394.35
18  13957.93   8632.00  35660.11

So for each column separately I want to get a series which contains the result for the first value divided by the second value, and then the first value divided by the third value and so on like this (only calculated on first few rows):

           0         1         2
0      0.949     1.147     1.02
1      3.03      1.665     2.74
2      2.74      2.44      3.04
3      0.031      0.6      0.29

I'm working with dask and this is what I did so far:

def create_dask(df):
    delayed_computations = []

    # Convert each column to a Dask Series and then create a Dask DataFrame
    for column_name in df.columns:
        dask_series = dd.from_pandas(df[column_name],
                                     npartitions=1)  #keep each column in one partition

        computation = dask_series.map_partitions(lambda df: calculate_ratios(df), 
        meta=pd.Series([], dtype='float64'))

        delayed_computations.append(computation)


    return dask_results

def calculate_ratios(data):
    # Calculate pairwise ratios for each value in the column
    file_len = 19
    ratios = []

    for i in range(file_len - 1):
        ratio_values = data.iloc[i] / data.iloc[i + 1:]
        ratios.append(ratio_values)

    return dd.concat(ratios, axis=1, ignore_index=True)
    

if __name__ == '__main__':
    df_lung = pd.read_excel(os.path.join(base_path, 'raw_data_49_lung_cluster90 - Copy.xls'))

    dask_lung = create_dask(df_lung)

My problem is that I can't seem to compute and see the result for 'computation' or any dask structure after it, I can compute dask_series and see my data, but after that it takes a really long time to compute() anything else and I don't know whether my 'calculate_ratios' function really works, why is that happening and how can I fix it?

Thank you!

1

There are 1 best solutions below

5
Amira Bedhiafi On

Your create_dask function doesn't return anything, so dask_lung = create_dask(df_lung) will return None. In calculate_ratios, you're using dd.concat but you should be using pd.concat because you are working on a Pandas Series in calculate_ratios. Also, you are using axis=1 which is incorrect for Series. It should be axis=0. Another thing, you're looping through the data in calculate_ratios, which can be inefficient for large data. The variable file_len is hard-coded to be 19, which makes the function less flexible.

import dask.dataframe as dd
import pandas as pd
import os

def create_dask(df):
    delayed_computations = []
    
    for column_name in df.columns:
        dask_series = dd.from_pandas(df[column_name], npartitions=1)
        
        computation = dask_series.map_partitions(lambda df: calculate_ratios(df),
                                                 meta=pd.Series([], dtype='float64'))
        delayed_computations.append(computation)
    
    return dd.concat(delayed_computations, axis=1)

def calculate_ratios(data):
    ratios = []
    file_len = len(data)
    
    for i in range(file_len - 1):
        ratio_values = data.iloc[i] / data.iloc[i + 1:]
        ratios.append(ratio_values)
    
    return pd.concat(ratios, axis=0, ignore_index=True)

if __name__ == '__main__':
    base_path = 'your_base_path_here'
    df_lung = pd.read_excel(os.path.join(base_path, 'raw_data_49_lung_cluster90 - Copy.xls'))
    
    dask_lung = create_dask(df_lung)
    computed_result = dask_lung.compute()
    print(computed_result)

Update :

You can opt for NumPy, and NumPy array operations which are generally faster than native Python operations. And since you're using Dask, it's already parallelizing some of your work. However, you can try partitioning your data into smaller pieces so that more operations can happen concurrently.

import numpy as np

def calculate_ratios(data):
    data = data.reset_index(drop=True)
    data_array = data.values
    
    # Here I created an empty array to store the results
    results = np.zeros((len(data) - 1, len(data) - 1))
    
    # Then looped only once through the data
    for i in range(len(data) - 1):
        results[i] = data_array[i] / data_array[i + 1:]
    
    # Then I converted the numpy array back to a Pandas Series
    final_result = pd.Series(results.ravel())
    
    return final_result.dropna()  # Removed NaN values