Parallelizing Spark's Pandas API Operations

123 Views Asked by At

Spark's Pandas API allows for Pandas functions to be performed on top of a Spark dataframe that looks and behaves like a Pandas Dataframe. Pandas has functions that Spark does not have implementations for, the one that I care about is the ewm function to provide an exponential moving average. I'm having trouble figuring out how to run these functions in a way that takes advantage of Spark's distributed processing, as the calculation is being performed on a single partition instead of the desired outcome of multiple partitions.

I'm fairly new to Spark/Pyspark, so I tried pursuing a couple avenues to find solutions to the problem of running a function on a subset of data over a window. I tried exploring UDFs, but currently it appears that UDFs can only apply non-aggregating functions, so applying a function to a window of data wasn't something I was able to get working. After exploring the Pandas API I was able to achieve the functionality I was after, applying the ewm function to my data:

from pyspark.sql import functions as F
import pyspark.pandas as ps

def ewm_2(column: pd.Series[float]) -> pd.Series[float]:
    return column.ewm(span=2).mean()

def calculate_pandas_api(df):
    ps.set_option("compute.ops_on_diff_frames", True)
    pdf = df.pandas_api()
    pdf["EWM2"] = pdf.groupby("Name")["Scores"].transform(ewm_2)
    sdf = pdf.to_spark()
    sdf = sdf.repartition("Name")
    return sdf

When the operation was applied though, I saw a stream of warnings about performance degradation occurring as all of the calculations were being performed on a single partition. The groupby I added certainly achieved the functionality of seperating each name for it's own ewm calculation, but Spark wasn't able to infer that those operations could be performed independently of each other. I haven't been able to find any resources that seemed to provide a solution to this problem. I briefly explored fugue, but quickly realized that I would run into the same aggregation problem I had trying to write a udf. I'm not sure what to try next, and so I would love some feedback if anyone would be willing to correct any misconceptions I have, or give some guidance as to a solution to parallelize this operation through spark.

0

There are 0 best solutions below