aggregate of column over another column only today' date and for other dates make it NULL using pyspark

23 Views Asked by At

I trying to aggregate the units by last 5 days and last 7 days based on date. I would like to get the sum of qty as new column qty_7d and qty_5d over the name only for today day's and null for remaining dates. how can i achieve it

enter code here

 +--------------------+--------------------+-------+
 |           name|qty|date|
 +--------------------+--------------------+-------+
|abc|  26017.504534644177|  2024-03-07|
|abcd|   52822.92689889294|  2024-03-03|
| ace|   8241.794715899114|  2024-03-01|
| abc|   7683.035818909406|  2024-03-02|
|abc|   8760.503046013466|  2024-03-03|
|abcd|                 0.0|  2024-03-04|
|abe|   15247.23960036059|  2024-03-05|
|abc.|                 0.0|  2024-03-02|
|abcd|  23951.349995693676|  2024-03-04|
|abe|   72252.24872799969|  2024-03-05|
|    abc|  3394.5305480794277|  2024-03-06|
|    abe|   25579.26993036305|  2024-03-02|
|abe|                 0.0|  2024-03-07|
|abcd|                 0.0|  2024-03-01|
+--------------------+--------------------+-------+

I tried to use window functions but it didnt worked

I have tried below code but getting error

test_df = test_df.withColumn('qty_7d', 
                        F.when(F.col('date') == get_today(), 
                              test_df.groupBy('name').agg(
                                  F.when(F.col("date").isin(last_7_days), sum(F.col('qty')))))
                        .otherwise(F.lit(null)))

error

------------------------------------------------------------------ 
 ---------
TypeError                                 Traceback (most recent 
call last)
<ipython-input-50-631b79e7cd79> in <module>      2                             
F.when(F.col('date') == get_today(), 
  3                                   test_df.groupBy('name').agg(
----> 4                                       
F.when(F.col("today").isin(keys), sum(F.col('qty')))))
  5                             .otherwise(F.lit(null)))

~/miniconda/envs/env_1/lib/python3.6/site- 
packages/pyspark/sql/column.py in __iter__(self)
351 
352     def __iter__(self):
--> 353         raise TypeError("Column is not iterable")
354 
355     # string methods

TypeError: Column is not iterable
0

There are 0 best solutions below