Efficiently merging into a single dataframe content split across multiple netCDF files in Spark 3.4 (scala)

76 Views Asked by At

I am using Spark 3.4. I have data that is split across multiple netCDF files. I would like to efficiently merge them into one dataframe. I could not get SciSpark to work with Spark 3.x (though it works with Spark 2.x) and found no helpful source to use Apache Sedona. So I am attempting to use a generic library netCDF-Java to accomplish the task. I have read data from a single file as shown below:

import ucar.nc2.NetcdfFile
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}

case class Frame(time: Float, dim: Array[Double])
val crdSchema = new StructType()
    .add("time", FloatType)
    .add("dimensions", ArrayType(DoubleType))

val netcdfFile = NetcdfFile.open("file1.nc")

val time = netcdfFile.getVariables.get(0).read()
val dim = netcdfFile.getVariables.get(5).read()

val nFrames = time.getSize.toInt

val sTime: Array[Float] = new Array[Float](nFrames)
for (i <- 0 until nFrames) {
    sTime(i) = time.getObject(i).asInstanceOf[Float]
}

val dimList = Array.ofDim[Double](nFrames, 3)
for (i <- 0 until nFrames) {
    dimList(i)(0) = dim.getObject(3*i + 0).asInstanceOf[Double]
    dimList(i)(1) = dim.getObject(3*i + 1).asInstanceOf[Double]
    dimList(i)(2) = dim.getObject(3*i + 2).asInstanceOf[Double]
}

val df = spark.createDataFrame(sc.parallelize(sTime.zip(dimList).map(x => Row(x._1, x._2))), crdSchema)

Reading input files serially (and concatenating them) will be inefficient. So I am keen to know if there is a more generic way to create a dataframe from multiple netCDF files such that the input files could be read in parallel.

Thanks in advance for any suggestion(s).

0

There are 0 best solutions below