I'm trying to create a column of standardized (z-score) of a column x on a Spark dataframe, but am missing something because none of it is working.
Here's my example:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from scipy.stats import zscore
@pandas_udf('float')
def zscore_udf(x: pd.Series) -> pd.Series:
return zscore(x)
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["id","x"]
data = [("a", 81.0),
("b", 36.2),
("c", 12.0),
("d", 81.0),
("e", 36.3),
("f", 12.0),
("g", 111.7)]
df = spark.createDataFrame(data=data,schema=columns)
df.show()
df = df.withColumn('y', zscore_udf(df.x))
df.show()
Which results in obviously wrong calculations:
+---+-----+----+
| id| x| y|
+---+-----+----+
| a| 81.0|null|
| b| 36.2| 1.0|
| c| 12.0|-1.0|
| d| 81.0| 1.0|
| e| 36.3|-1.0|
| f| 12.0|-1.0|
| g|111.7| 1.0|
+---+-----+----+
Thank you for your help.
How to fix:
instead of using a UDF calculate the
stddev_pop
and theavg
of the dataframe and calculate z-score manually.I suggest using "window function" over the entire dataframe for the first step and then a simple arithmetic to get the z-score.
see suggested code:
Why the UDF didn't work?
Spark is used for distributed computation. When you perform operations on a DataFrame Spark distributes the workload into partitions on the executors/workers available.
pandas_udf
is not different. When running a UDF from the type pd.Series -> pd.Series some rows are sent to partition X and some to partition Y, then whenzscore
is run it calculates the mean and std of the data in the partition and writes the zscore based on that data only.I'll use
spark_partition_id
to "prove" this.rows a,b,c were mapped in partition 0 while d,e,f,g in partition 1. I've calculated manually the mean/stddev_pop of both the entire set and the partitioned data and then calculated the z-score. the UDF z-score was equal to the z-score of the partition.
I also added df.repartition(8) prior to the calculation and managed to get similar results as in the original question. partitions with 0 stddev --> null z score, partition with 2 rows --> (-1, 1) z scores.