Intro
Hey guys, I'm ultra-new to celery and task queues in general so I have a question that is probably fairly naive.
I want to take a rather large .csv file (which is converted to a pandas DataFrame) and run a pearson test on it (a stats math function) across all column pairs. It takes about 9 minutes to due on a single core and we have hundreds of these .csv files!
So I want to divide this processing among all of the cores on our 3-server cluster. Here's a prototype of my code thus far ....
from celery import Celery
import numpy as np
import pandas as pd
import scipy.stats as stats
import itertools
app = Celery()
minute_CSV = pd.read_csv('./test_dataframe.csv')
cycle_length = 300
row_max = minute_CSV.shape[0]
r_vector_data = pd.DataFrame()
column_combinations = itertools.combinations(minute_CSV.filter(regex='FREQ').keys(),2)
xy_cols = list(column_combinations)
@app.task
def data_processing(minute_CSV, cycle_length, row_max, x, y):
return np.array([stats.pearsonr(minute_CSV[x][c-cycle_length:c],
minute_CSV[y][c-cycle_length:c])[0] for c in range(cycle_length,row_max)])
for i in range(0, len(xy_cols)):
x = xy_cols[i][0]
y = xy_cols[i][1]
r_vector_data[x + ' to ' + y] = data_processing.delay(minute_CSV, cycle_length, row_max, x, y)
pd.DataFrame.to_csv(r_vector_data, processed_dataframe.csv)
When I run this I get this message:
"[1200 rows x 870 columns] is not JSON serializable"
The Math
The way the pearson correlation works is as follows: take 300 (in my case) sequential rows of two columns, run the correlation and store the result in a new DataFrame (r_vector_data). This is done for rows: (0..299), (1..300), (2..301), and so on.
Also, this script is only considering one .csv file, but will be modified later :).
Thoughts on where to go from here? How would I use celery to accomplish this, because I'm a little lost in the documentation.
Thanks!
You're seeing the error because Celery is trying to JSON serialize minute_CSV. By default every message in Celery is encoded using JSON. See http://docs.celeryproject.org/projects/kombu/en/latest/userguide/serialization.html for more info on that.
To limit the data transfer, you probably only want to send the relevant rows for each call to your
data_processing
task.