PySpark: Pandas UDF for scipy statistical transformations

605 Views Asked by At

I'm trying to create a column of standardized (z-score) of a column x on a Spark dataframe, but am missing something because none of it is working.

Here's my example:

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from scipy.stats import zscore

@pandas_udf('float')
def zscore_udf(x: pd.Series) -> pd.Series:
    return zscore(x)

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["id","x"]
data = [("a", 81.0),
    ("b", 36.2),
    ("c", 12.0),
    ("d", 81.0),
    ("e", 36.3),
    ("f", 12.0),
    ("g", 111.7)]

df = spark.createDataFrame(data=data,schema=columns)

df.show()

df = df.withColumn('y', zscore_udf(df.x))

df.show()

Which results in obviously wrong calculations:

+---+-----+----+
| id|    x|   y|
+---+-----+----+
|  a| 81.0|null|
|  b| 36.2| 1.0|
|  c| 12.0|-1.0|
|  d| 81.0| 1.0|
|  e| 36.3|-1.0|
|  f| 12.0|-1.0|
|  g|111.7| 1.0|
+---+-----+----+

Thank you for your help.

1

There are 1 best solutions below

1
On

How to fix:
instead of using a UDF calculate the stddev_pop and the avg of the dataframe and calculate z-score manually.
I suggest using "window function" over the entire dataframe for the first step and then a simple arithmetic to get the z-score.
see suggested code:

from pyspark.sql.functions import avg, col, stddev_pop
from pyspark.sql.window import Window

df2 = df \
.select(
    "*",
    avg("x").over(Window.partitionBy()).alias("avg_x"),
    stddev_pop("x").over(Window.partitionBy()).alias("stddev_x"),
) \
.withColumn("manual_z_score", (col("x") - col("avg_x")) / col("stddev_x")) 

Why the UDF didn't work?
Spark is used for distributed computation. When you perform operations on a DataFrame Spark distributes the workload into partitions on the executors/workers available.

pandas_udf is not different. When running a UDF from the type pd.Series -> pd.Series some rows are sent to partition X and some to partition Y, then when zscore is run it calculates the mean and std of the data in the partition and writes the zscore based on that data only.

I'll use spark_partition_id to "prove" this.
rows a,b,c were mapped in partition 0 while d,e,f,g in partition 1. I've calculated manually the mean/stddev_pop of both the entire set and the partitioned data and then calculated the z-score. the UDF z-score was equal to the z-score of the partition.

from pyspark.sql.functions import pandas_udf, spark_partition_id, avg, stddev, col, stddev_pop
from pyspark.sql.window import Window

df2 = df \
.select(
    "*",
    zscore_udf(df.x).alias("z_score"),
    spark_partition_id().alias("partition"),
    avg("x").over(Window.partitionBy(spark_partition_id())).alias("avg_partition_x"),
    stddev_pop("x").over(Window.partitionBy(spark_partition_id())).alias("stddev_partition_x"),
) \
.withColumn("partition_z_score", (col("x") - col("avg_partition_x")) / col("stddev_partition_x"))

df2.show()

+---+-----+-----------+---------+-----------------+------------------+--------------------+
| id|    x|    z_score|partition|  avg_partition_x|stddev_partition_x|   partition_z_score|
+---+-----+-----------+---------+-----------------+------------------+--------------------+
|  a| 81.0|   1.327058|        0|43.06666666666666|28.584533502500186|  1.3270579815484989|
|  b| 36.2|-0.24022315|        0|43.06666666666666|28.584533502500186|-0.24022314955974558|
|  c| 12.0| -1.0868348|        0|43.06666666666666|28.584533502500186| -1.0868348319887526|
|  d| 81.0|  0.5366879|        1|            60.25|38.663063768925504|  0.5366879387524718|
|  e| 36.3|-0.61945426|        1|            60.25|38.663063768925504| -0.6194542714757446|
|  f| 12.0| -1.2479612|        1|            60.25|38.663063768925504|  -1.247961110593097|
|  g|111.7|  1.3307275|        1|            60.25|38.663063768925504|  1.3307274433163698|
+---+-----+-----------+---------+-----------------+------------------+--------------------+

I also added df.repartition(8) prior to the calculation and managed to get similar results as in the original question. partitions with 0 stddev --> null z score, partition with 2 rows --> (-1, 1) z scores.

+---+-----+-------+---------+---------------+------------------+-----------------+
| id|    x|z_score|partition|avg_partition_x|stddev_partition_x|partition_z_score|
+---+-----+-------+---------+---------------+------------------+-----------------+
|  a| 81.0|   null|        0|           81.0|               0.0|             null|
|  d| 81.0|   null|        0|           81.0|               0.0|             null|
|  f| 12.0|   null|        1|           12.0|               0.0|             null|
|  b| 36.2|   -1.0|        6|          73.95|             37.75|             -1.0|
|  g|111.7|    1.0|        6|          73.95|             37.75|              1.0|
|  c| 12.0|   -1.0|        7|          24.15|12.149999999999999|             -1.0|
|  e| 36.3|    1.0|        7|          24.15|12.149999999999999|              1.0|
+---+-----+-------+---------+---------------+------------------+-----------------+