Generic T as Spark Dataset[T] constructor

4.2k Views Asked by At

In the following snippet, the tryParquet function tries to load a Dataset from a Parquet file if it exists. If not, it computes, persists and returns back the Dataset plan which was provided:

import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset

sealed trait CustomRow

case class MyRow(
  id: Int,
  name: String
) extends CustomRow

val ds: Dataset[MyRow] =
  Seq((1, "foo"),
      (2, "bar"),
      (3, "baz")).toDF("id", "name").as[MyRow]

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
    Try(session.read.parquet(path)) match {
      case Success(df) => df.as[T] // <---- compile error here
      case Failure(_)  => {
        target.write.parquet(path)
        target
      }
    }

val readyDS: Dataset[MyRow] =
    tryParquet(spark, "/path/to/file.parq", ds)

However this produces a compile error on df.as[T]:

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._

Support for serializing other types will be added in future releases.

case Success(df) => df.as[T]

One can circumvent this problem by making tryParquet cast df to return an untyped DataFrame and let caller cast to the desired constructor. However is there any solution in the case we want the type to be managed internally by the function?

1

There are 1 best solutions below

0
On

Looks like it's possible by using an Encoder in the type parameter:

import org.apache.spark.sql.Encoder

def tryParquet[T <: CustomRow: Encoder](...)

This way the compiler can prove that df.as[T] is providing an Encoder when constructing the objects.