find the top n unique values of a column based on ranking of another column within groups in pyspark

755 Views Asked by At

I have a dataframe like below:

df = pd.DataFrame({ 'region': [1,1,1,1,1,1,2,2,2,3], 
               'store': ['A', 'A', 'C', 'C', 'D', 'B', 'F', 'F', 'E', 'G'], 
               'call_date': ['2022-03-10', '2022-03-09', '2022-03-08', '2022-03-07', '2022-03-06', '2022-03-06',
                            '2022-03-10', '2022-03-09', '2022-03-08', '2022-03-09']  })
df['call_date']=pd.to_datetime(df['call_date'], format='%Y-%m-%d')
sdf = spark.createDataFrame(df)

+------+-----+-------------------+
|region|store|          call_date|
+------+-----+-------------------+
|     1|    A|2022-03-10 00:00:00|
|     1|    A|2022-03-09 00:00:00|
|     1|    C|2022-03-08 00:00:00|
|     1|    C|2022-03-07 00:00:00|
|     1|    D|2022-03-06 00:00:00|
|     1|    B|2022-03-06 00:00:00|
|     2|    F|2022-03-10 00:00:00|
|     2|    F|2022-03-09 00:00:00|
|     2|    E|2022-03-08 00:00:00|
|     3|    G|2022-03-09 00:00:00|
+------+-----+-------------------+

I need to find the top 3 unique stores that have been called most recently.

+------+-----+
|region|store|
+------+-----+
|     1|    A|
|     1|    C|
|     1|    D|
|     2|    F|
|     2|    E|
|     3|    G|
+------+-----+

I have tried to generate rank base on call_date within each region.

sdf.withColumn('RANK',F.dense_rank().over(Window.partitionBy("region").orderBy(F.col('call_date').desc()))).show()

+------+-----+-------------------+----+
|region|store|          call_date|RANK|
+------+-----+-------------------+----+
|     1|    A|2022-03-10 00:00:00|   1|
|     1|    A|2022-03-09 00:00:00|   2|
|     1|    C|2022-03-08 00:00:00|   3|
|     1|    C|2022-03-07 00:00:00|   4|
|     1|    D|2022-03-06 00:00:00|   5|
|     1|    B|2022-03-06 00:00:00|   5|
|     2|    F|2022-03-10 00:00:00|   1|
|     2|    F|2022-03-09 00:00:00|   2|
|     2|    E|2022-03-08 00:00:00|   3|
|     3|    G|2022-03-09 00:00:00|   1|
+------+-----+-------------------+----+

But the issue is how I can get the top 3 unique stores based on rank? obviously I cannot just take the top 3 rank from each region. I need the unique stores. Appreciate if anyone can help out. I have thought about it for long time but still got no clue...

4

There are 4 best solutions below

1
On

How about drop duplicates before you rank?

w = Window.partitionBy(['region', 'store']).orderBy('call_date').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
sdf = (sdf.withColumn('call_date', F.last('call_date').over(w))
       .distinct()
       .withColumn('RANK', F.row_number().over(Window.partitionBy("region").orderBy(F.col('call_date').desc())))
       .filter(F.col('RANK') <= 3))

df.show()

+------+-----+-------------------+----+
|region|store|          call_date|RANK|
+------+-----+-------------------+----+
|     1|    A|2022-03-09 00:00:00|   1|
|     1|    C|2022-03-07 00:00:00|   2|
|     1|    D|2022-03-06 00:00:00|   3|
|     3|    G|2022-03-09 00:00:00|   1|
|     2|    F|2022-03-09 00:00:00|   1|
|     2|    E|2022-03-08 00:00:00|   2|
+------+-----+-------------------+----+
3
On

I use both pyspark and pandas. I wouldn't think of a pandas solution that beats spark

w= Window.partitionBy('region').orderBy(desc('call_date'))
new=(sdf.dropDuplicates(subset=['store'])#drop duplicatess
     .withColumn('rank',row_number().over(w)).where(col('rank')<4)#order and rank
     .drop('rank')#drop unwanted column
    )

new.show()

+------+-----+-------------------+
|region|store|          call_date|
+------+-----+-------------------+
|     1|    A|2022-03-10 00:00:00|
|     1|    C|2022-03-08 00:00:00|
|     1|    D|2022-03-06 00:00:00|
|     2|    F|2022-03-10 00:00:00|
|     2|    E|2022-03-08 00:00:00|
|     3|    G|2022-03-09 00:00:00|
+------+-----+-------------------+
2
On

Try with groupby:

>>> df.sort_values("call_date").drop_duplicates("store").groupby("region").apply(lambda x: x.nlargest(3, "call_date")).reset_index(drop=True)

   region store  call_date
0       1     A 2022-03-09
1       1     C 2022-03-07
2       1     D 2022-03-06
3       2     F 2022-03-09
4       2     E 2022-03-08
5       3     G 2022-03-09
1
On

I would do it in 2 steps, first I would create a column to count call_date partitioned by both region and store, then use this "count" column to extract the dense rank and least would filter only the "rank" column less or equal 3 as below:

(sdf.withColumn('COUNT',F.count(F.col("call_date")).over(W.partitionBy(["region", "store"])))
    .withColumn('RANK', F.dense_rank().over(W.partitionBy(["region", "store"]).orderBy(F.col("COUNT").desc())
    .filter(F.col('RANK') <= 3)
    .show())