Parallelize Dataframe writing in Spark 1.5

65 Views Asked by At

I have the following dataframe:

+-----+-------+-----+------+
|city1|  city2| year| month|
+-----+-------+-----+------+
|  AAA|    QQQ| 2019|     2|
|  BBB|    WWW| 2018|     5|
|  CCC|    RRR| 2019|     2|
|  DDD|    EEE| 2019|     7|
+-----+-------+-----+------+

I want to write the output as a CSV into folders partitioned by year and month. For exapmle, the above case will have 3 folder paths as follows:

basepath/year=2018/month=5   (with 1 record)
basepath/year=2019/month=2   (with 2 records)
basepath/year=2019/month=7   (with 1 record)

Unfortunately I have to use Spark 1.5.0 along with Scala which doesn't have functions to readily write into these partitions based on unique values in columns desired.

I have the following code as of now:

// Assuming df as the above example

val uniqueYears = df.select("year").distinct().map(_ getString(0)).collect.toList
val uniqueMonths = df.select("month").distinct().map(_ getString(0)).collect.toList

for (year <- uniqueYears){
  for (month <- uniqueMonths){
  val outputDf = df.filter((df("year") === year) &&
                           (df("month") === month))
  val outputPath = s"${basePath}/departure_year=${year}/departure_month=${month}"

  outputDf.write.format("com.databricks.spark.csv")
          .option("delimiter", "^")
          .option("nullValue", "")
          .option("treatEmptyValuesAsNulls", "false")
          .save(outputPath)
  }
}

The problem here is, I see that the output writing is happening sequentially, i.e, it starts with first year in the loop, finishes all the months in it, then skips to next year and so on. This is adding unnecessary run time to the job as I'm seeing that it is taking almost exact same time to write to every partition even though data is not spread equally between partitions.

Is there any ways we can parallelize this code so that CSV's are written to these folders parallely instead of sequentially?

Please note that I can't use other versions of Spark apart from 1.5.0

0

There are 0 best solutions below