Using celery to distribute math processing

781 Views Asked by At

Intro

Hey guys, I'm ultra-new to celery and task queues in general so I have a question that is probably fairly naive.

I want to take a rather large .csv file (which is converted to a pandas DataFrame) and run a pearson test on it (a stats math function) across all column pairs. It takes about 9 minutes to due on a single core and we have hundreds of these .csv files!

So I want to divide this processing among all of the cores on our 3-server cluster. Here's a prototype of my code thus far ....

from celery import Celery
import numpy as np
import pandas as pd
import scipy.stats as stats
import itertools

app = Celery()

minute_CSV = pd.read_csv('./test_dataframe.csv')

cycle_length = 300
row_max = minute_CSV.shape[0]
r_vector_data = pd.DataFrame()

column_combinations = itertools.combinations(minute_CSV.filter(regex='FREQ').keys(),2)
xy_cols = list(column_combinations)

@app.task
def data_processing(minute_CSV, cycle_length, row_max, x, y):
    return np.array([stats.pearsonr(minute_CSV[x][c-cycle_length:c],
    minute_CSV[y][c-cycle_length:c])[0] for c in range(cycle_length,row_max)])

for i in range(0, len(xy_cols)):
    x = xy_cols[i][0]
    y = xy_cols[i][1]
    r_vector_data[x + ' to ' + y] = data_processing.delay(minute_CSV, cycle_length, row_max, x, y)

pd.DataFrame.to_csv(r_vector_data, processed_dataframe.csv)

When I run this I get this message:

"[1200 rows x 870 columns] is not JSON serializable"

The Math

The way the pearson correlation works is as follows: take 300 (in my case) sequential rows of two columns, run the correlation and store the result in a new DataFrame (r_vector_data). This is done for rows: (0..299), (1..300), (2..301), and so on.

Also, this script is only considering one .csv file, but will be modified later :).

Thoughts on where to go from here? How would I use celery to accomplish this, because I'm a little lost in the documentation.

Thanks!

1

There are 1 best solutions below

0
On

You're seeing the error because Celery is trying to JSON serialize minute_CSV. By default every message in Celery is encoded using JSON. See http://docs.celeryproject.org/projects/kombu/en/latest/userguide/serialization.html for more info on that.

To limit the data transfer, you probably only want to send the relevant rows for each call to your data_processing task.