I have a table like this. I want to get the product_id of the row which has closet purchase_date (checking all rows before current row) and assign it to a new column (ref_id) for current's value for each product name:
Could anyone come up with a solution (spark) with out using udf?
I have tried sth like this: creating a temp column which gets the max purchase date and assign it. If the current row's purchase date equal's to max purchase date value:
self.df = self.df.withColumn("temp_pr_dt",unix_timestamp(col("purchase_date")))
w_B_v4 = Window.partitionBy(product_name).orderBy(col("temp_pr_dt")).rangeBetween(Window.unboundedPreceding,-1)
self.df = self.df.withColumn("max_flag", max("purchase_date").over(w_B_v4))
self.df = self.df.withColumn("ref_id",when(col("max_flag")==col("purchase_date").over(w_B_v4),col("product_id "))
You can take the max_by of the desired column within the window:
Result: