spark register expression for SQL DSL

1k Views Asked by At

How can I access a catalyst expression (not regular UDF) in spark SQL scala DSL API?

http://geospark.datasyslab.org only allows for text based execution

GeoSparkSQLRegistrator.registerAll(sparkSession)
var stringDf = sparkSession.sql(
  """
    |SELECT ST_SaveAsWKT(countyshape)
    |FROM polygondf
  """.stripMargin)

When I try to use the SQL scala DSL df.withColumn("foo", ST_Point(col("x"), col("y"))) I get an error of type mismatch expected column got ST_Point.

What do I need to change to properly register the catalyst expression as something which is callable directly via scala SQL DSL API?

edit

catalyst expressions are all registered via https://github.com/DataSystemsLab/GeoSpark/blob/fadccf2579e4bbe905b2c28d5d1162fdd72aa99c/sql/src/main/scala/org/datasyslab/geosparksql/UDF/UdfRegistrator.scala#L38:

Catalog.expressions.foreach(f=>sparkSession.sessionState.functionRegistry.createOrReplaceTempFunction(f.getClass.getSimpleName.dropRight(1),f))

edit2

import org.apache.spark.sql.geosparksql.expressions.ST_Point
val  myPoint = udf((x: Double, y:Double) => ST_Point _)

fails with:

_ must follow method; cannot follow org.apache.spark.sql.geosparksql.expressions.ST_Point.type
2

There are 2 best solutions below

1
On

You can access expressions that aren't exposed in the org.apache.spark.sql.functions package using the expr method. It doesn't actually give you a UDF-like object in Scala, but it does allow you to write the rest of your query using the Dataset API.

Here's an example from the docs:

// get the number of words of each length
df.groupBy(expr("length(word)")).count()
0
On

Here's another method that you can use to call the UDF and what I've done so far.

      .withColumn("locationPoint", callUDF("ST_Point", col("longitude"),
        col("latitude")))