How to get the other columns values using a window with rangeBetween in Pyspark

63 Views Asked by At

I have a table like this. I want to get the product_id of the row which has closet purchase_date (checking all rows before current row) and assign it to a new column (ref_id) for current's value for each product name:

enter image description here

Could anyone come up with a solution (spark) with out using udf?

I have tried sth like this: creating a temp column which gets the max purchase date and assign it. If the current row's purchase date equal's to max purchase date value:

self.df = self.df.withColumn("temp_pr_dt",unix_timestamp(col("purchase_date")))
w_B_v4 = Window.partitionBy(product_name).orderBy(col("temp_pr_dt")).rangeBetween(Window.unboundedPreceding,-1)
self.df = self.df.withColumn("max_flag", max("purchase_date").over(w_B_v4))
self.df = self.df.withColumn("ref_id",when(col("max_flag")==col("purchase_date").over(w_B_v4),col("product_id "))
1

There are 1 best solutions below

2
On

You can take the max_by of the desired column within the window:

from pyspark.sql import functions as F

data = [
    (1, 100005, '1/12/2021', 20),
    (1, 100006, '1/20/2021', 30),
    (1, 100007, '1/30/2021', 35),
    (1, 100008, '1/16/2021', 20),
    (1, 100009, '1/16/2021', 25),
    (1, 100010, '1/17/2021', 50),
    (1, 100011, '1/18/2021', 10)
]
columns = ["product_name", "product_id", "purchase_date", "price"]
df = spark.createDataFrame(data, columns)

w_B_v4 = Window.partitionBy('product_name').orderBy('temp_pr_dt').rangeBetween(Window.unboundedPreceding,-1)

df.withColumn('purchase_date', F.to_date('purchase_date', 'M/d/yyyy')) \
  .withColumn('temp_pr_dt', F.unix_timestamp('purchase_date')) \
  .withColumn('ref_id', F.expr('max_by(product_id, temp_pr_dt)').over(w_B_v4)) \
  .show()

Result:

+------------+----------+-------------+-----+----------+------+
|product_name|product_id|purchase_date|price|temp_pr_dt|ref_id|
+------------+----------+-------------+-----+----------+------+
|           1|    100005|   2021-01-12|   20|1610406000|  null|
|           1|    100008|   2021-01-16|   20|1610751600|100005|
|           1|    100009|   2021-01-16|   25|1610751600|100005|
|           1|    100010|   2021-01-17|   50|1610838000|100009|
|           1|    100011|   2021-01-18|   10|1610924400|100010|
|           1|    100006|   2021-01-20|   30|1611097200|100011|
|           1|    100007|   2021-01-30|   35|1611961200|100006|
+------------+----------+-------------+-----+----------+------+