A Spark job runs expensive computations in the first stage and I checkpoint the resulting RDD so that they don't have to be repeated in case executors are preempted (it runs on Yarn with preemption). Job is also using a high timeout value for spark.dynamicAllocation.cachedExecutorIdleTimeout so that cached results aren't lost.
However, after that first expensive stage the remaining stages are cheap and fast, and I want to free up resources of the cluster when possible. I've tried to change spark.dynamicAllocation.cachedExecutorIdleTimeout at runtime after the checkpoint but it seems to have no effect.
if sc.master == "yarn":
sc.setCheckpointDir(f"/user/{sc.sparkUser()}/checkpoints")
first_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
first_rdd.checkpoint()
first_rdd.count() # eval
sc.getConf().set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "5min")
sc.setLocalProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout", "5min")
Is there a way to change the idle timeout on an existing SparkContext?
As soon as the sparkContext is created with properties, you can't change it like you did. So your last 2 lines have no effect.
What you should do first is look at difference between persistence and checkpointing, there is a good SO question here
Finally, the best way to resolve your problem, is probably to use DataFrame with persist and unpersist methods.