Is there a way to apply the KS test from the spark.mllib
library in pyspark using the groupBy clause or some method of aggregation?
For example, I have a dataframe df
with columns ID
and RESULT
like so:
+-------+------+
| ID|RESULT|
+-------+------+
|3648296| 2.73|
|3648296| 9.64|
|3648189| 0.03|
|3648189| 0.03|
|3648296| 2.51|
|3648189| 0.01|
|3648296| 1.75|
|3648296| 30.23|
|3648189| 0.02|
|3648189| 0.02|
|3648189| 0.02|
|3648296| 3.28|
|3648296| 32.55|
|3648296| 2.32|
|3648296| 34.58|
|3648296| 29.22|
|3648189| 0.02|
|3648296| 1.36|
|3648296| 1.64|
|3648296| 1.17|
+-------+------+
There are 2 ID
s 3648296
and 3648189
and each of their corresponding RESULT
values are in the order of a few hundred thousand.
Is it possible to apply a groupBy function like so:
from pyspark.mllib.stat import Statistics
normtest=df.groupBy('ID').Statistics.kolmogorovSmirnovTest(df.RESULT, "norm", 0, 1)
such that I get an output dataframe like:
+-------+---------+----------+
| ID|p-value |statistic |
+-------+---------+----------+
|3648296|some val | some val |
|3648189|some val | some val |
+-------+---------+----------+
is this possible?
This can be solved by binning the data, then performing Kolmogorov-Smirnov Test on the binned data (i.e., histogram). It won't produce the largest distance, but if your effective distribution is smooth, then the result should be close enough.
By bucketing the results, you ensure that only a limited number of items (the number of buckets) will be loaded into memory at a time.
First, we need to implement a histogram version of the kstest:
Then use it as follows: