spark custom kryo encoder not providing schema for UDF

621 Views Asked by At

When following along with How to store custom objects in Dataset? and trying to register my own kryo encoder for a data frame I face an issue of Schema for type com.esri.core.geometry.Envelope is not supported

There is a function which will parse a String (WKT) to an geometry object like:

def mapWKTToEnvelope(wkt: String): Envelope = {
    val envBound = new Envelope()
    val spatialReference = SpatialReference.create(4326)
    // Parse the WKT String into a Geometry Object
    val ogcObj = OGCGeometry.fromText(wkt)
    ogcObj.setSpatialReference(spatialReference)
    ogcObj.getEsriGeometry.queryEnvelope(envBound)
    envBound
  }

This is applied to an UDF like:

implicit val envelopeEncoder: Encoder[Envelope] = Encoders.kryo[Envelope]
val ST_Envelope = udf((wkt: String) => mapWKTToEnvelope(wkt))

However, the UDF will compile but throw a runtime error of:

[error] Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type com.esri.core.geometry.Envelope is not supported
[error]         at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:733)
[error]         at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:671)
[error]         at org.apache.spark.sql.functions$.udf(functions.scala:3076)

edit

Whereas

val first = df[(String, String)].first
val envBound = new Envelope()
val ogcObj = OGCGeometry.fromText(first._1)
ogcObj.setSpatialReference(spatialReference)
ogcObj.getEsriGeometry.queryEnvelope(envBound)
spark.createDataset(Seq((envBound)))(envelopeEncoder)

Works just fine:

root
 |-- value: binary (nullable = true)
+--------------------+
|               value|
+--------------------+
|[01 00 63 6F 6D 2...|
+--------------------+

How can I get it to work in the UDF as well

0

There are 0 best solutions below