I want to implement a logic in databricks pyspark where I want to update next days value based on the updated value of last 14 days. I am using loops to do it. Below is the code but it is very slow and after certain point want move forward.
import time
start_time = time.time()
DateList=score_data.select("AsOfdate").distinct().orderBy('AsOfDate').rdd.flatMap(lambda x: x).collect()
score_data=score_data.withColumn('score_original',col('score')).withColumn('Amount_original',col('Amount'))
for date in DateList:
rolling_window=date+timedelta(-14)
print(date)
df_current=score_data.filter(col("AsOfDate").isin(date))
if 'rolling_median' in df_current.columns:
df_current=df_current.drop('rolling_median')
df_rolling = score_data.filter((col("AsOfDate")<=date) & (col("AsOfDate")>rolling_window))
rolling_median=df_rolling.filter(col('score').isNotNull()).groupBy('id').agg(F.expr('(percentile_approx(score,0.5)+percentile_approx(score,0.50001))/2').alias('rolling_median'))
df_current=df_current.join(rolling_median,'id','left')
df_current=df_current.withColumn('rolling_gap',abs(col('score')-col('rolling_median')))
df_current=df_current.withColumn('score',when(col('rolling_gap')>3,None).otherwise(col('score'))).withColumn('Amount',when(col('rolling_gap')>3,None).otherwise(col('Amount')))
score_data=score_data.filter(~(col("AsOfDate").isin(date)))
score_data=score_data.unionByName(df_current,allowMissingColumns=True)
print(f"Execution time: {time.time() - start_time}")
is there a way to improve this code? In attached data, green part show the updated score and orange part show the original score. So, next day, 31-aug-2023, I want to use the median based on the update score(null).

Doing operations on a data frame within a for loop that includes such operations like dropping a column, join and union are all reasons why the code is not performant. It is also not very easy to understand. Instead, try to see if you can do the following: