No space left on device error in Spark Scala

98 Views Asked by At

I have a spark program that basically reads files from S3, processes them, and writes output back to S3, then loads a new set of files and does the same thing. I also have 2 small data frames which I create at the start of the program and keep it cached throughout the program.

Problem in Detail: So in 1st iteration I load 500 files from S3 join them with 2 small data frames which I have already cached and do some operations and write the result back to S3. In the next iteration, I load fresh 500 files join them with the 2 small data frames which I have already cached do some operations, and write the results back to S3. During this process lot of data is getting shuffled, which is filling up the shuffle storage(scratch space) in the directory "/tmp/spark-local-dir-1".

So my question is in 2nd iteration I am laoding totally new files then why is the shuffle data from 1st iteration still being held back and not cleared? is this expected behavior? Is there a way I can clear at the end of each iteration?

So to free up the space after each iteration I deleted all the content of the "/tmp/spark-local-dir-1" directory, hoping that data inside is not required for 2nd iteration. But even though I loaded new files it is still looking for some of the old temp_shuffle files and giving file not found exception. Since I am unable to delete the data and due to multiple iterations storage is getting filled very quickly.

Sample Code:

import org.apache.spark.sql
                    
def main(args : Array[String]): Unit = {
                        
val session= SparkSession.builder().appName("SparkJob").getOrCreate()

val filterDF = SparkSession.active.read.format("csv").option("header", "true").load(FilterCSVFilePath)
filterDF.cahce() //caching DF to avoid reading it multiple times

val noFilterDF = SparkSession.active.read.format("csv").option("header", "true").load(noFilterCSVFilePath)
noFilterDF.cache() //caching DF to avoid reading it multiple times

val fileList: List[String] = ist("file1.avro","file2.avro","file5000.avro")

fileList.sliding(500).foreach(x => 
    val data = SparkSession.read.format("avro").option("header", "true").load(x: _*) // load 500 files in each iteration
    //join the loaded 500 files with already cached filter and noFilterDF
    val joinedData = data.join(filterDF).join(nofilterDF)
    val results = joinedData.groupBy("x","y").agg(sum("p")) //data shuffle happens
    results.write.mode(SaveMode.Overwrite).parquet(filePath)
    )
 }

So I was expecting Spark will clear shuffle data after every iteration or in it's GC cycle, but that doesn't happen and it keeps filling the shuffle space and eventually we get No space left on device error.

0

There are 0 best solutions below