The context is to register a UserDefinedFunction(UDF) in spark, where the UDF is an anonymous function obtained via reflection. Since the function signature of the function is determined at runtime, I was wondering whether it is possible to do so.
Say the function impl() returns an anonymous function:
trait Base {}
class A extends Base{
def impl(): Function1[Int, String] = new Function1[Int, String] {
def apply(x: Int): String = "ab" + x.toString
}
}
val classes = reflections.getSubTypesOf(classOf[Base]).toSet[Class[_ <: Base]].toList
and I obtain the anonymous function in another place:
val clazz = classes(0)
val instance = clazz.newInstance()
val impl = clazz.getDeclaredMethod("impl").invoke(instance)
Now, impl holds the anonymous function but I do not know its signature, and I'd like to ask whether we can convert it into a correct function instance:
impl.asInstanceOf[Function1[Int, String]] // How to determine the function signature of the anonymous function, in this case Function1[Int, String]?
Since scala does not support generic function, I first consider getting the runtime type of the function:
import scala.reflect.runtime.universe.{TypeTag, typeTag}
def getTypeTag[T: TypeTag](obj: T) = typeTag[T]
val typeList = getTypeTag(impl).tpe.typeArgs
It will return List(Int, String), but I fail to recognize the correct function template via reflection.
Update: if the classes are defined as follows:
trait Base {}
class A extends Base{
def impl(x: Int): String = {
"ab" + x.toString
}
}
where impl is the function itself and we do not know its function signature, can the impl function still be registered?
Normally you register a UDF as follows
The signature of
registerisaka
What
TypeTagnormally does is persisting a type information from compile time to runtime.So in order to call
registeryou either have to know types at compile time or have to know how to construct type tags at runtime.If you don't have access to how
impl()is constructed at runtime and you don't have (at least at runtime) the information about types/type tags at all then unfortunately this type information is irreversibly lost because of the type erasure (Function1[Int, String]is justFunction1[_,_]at runtime)But it's possible that you have access to how
impl()is constructed at runtime and you know (at least at runtime) the information about types/type tags. So I assume that you don't have typesInt,Stringstatically and you can't calltypeTag[Int],typeTag[String](as I do below) but you have somehow runtime objects ofType/TypeTagIn such case you can call
registerresolving implicits explicitlyWell, this doesn't compile because of existential types but you can trick the compiler
https://gist.github.com/DmytroMitin/0b3660d646f74fb109665bad41b3ae9f
Alternatively you can use runtime compilation (creating a new compile time inside the runtime)
https://gist.github.com/DmytroMitin/5b5dd4d7db0d0eebb51dd8c16735e0fb
You should provide some code how you construct
impl()and we'll see whether it's possible to restore the types.Spark registered a Scala object all of the methods as a UDF
scala cast object based on reflection symbol
Update. After you get
val impl = clazz.getDeclaredMethod("impl").invoke(instance)it's too late to restore function types (you can check thattypeListis empty). Where function type (or type tag) should be captured is somewhere not too far from classA, maybe insideAor outsideAbut whenInt,Stringare not lost yet. WhatTypeTagcan do is persisting type information from compile time to runtime, it can't restore type information at runtime if it's lost.https://gist.github.com/DmytroMitin/2ebfae922f8a467d01b6ef18c8b8e5ad
(*) Get a TypeTag from a Type?
Nowspark.sql("""SELECT foo(10)""").show()throwsjava.io.NotSerializableExceptionbut I guess it's not related to reflection.Alternatively you can use runtime compilation (instead of manual resolution of implicits and construction of type tags from types)
https://gist.github.com/DmytroMitin/ba469faeca2230890845e1532b36e2a1
One more option is to request the return type of method
impl()as soon as we get classA(outsideA)https://gist.github.com/DmytroMitin/3bd2c19d158f8241a80952c397ee5e09
Update 2. If the methods are defined as follows:
then runtime compilation normally should be
or
but now with Spark it produces
ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDFsimilarly to Spark registered a Scala object all of the methods as a UDFhttps://gist.github.com/DmytroMitin/b0f110f4cf15e2dfd4add70f7124a7b6
But ordinary Scala runtime reflection seems to work
https://gist.github.com/DmytroMitin/763751096fe9cdb2e0d18ae4b9290a54
Update 3. One more approach is to use compile-time reflection (macros) rather than runtime reflection if you have enough information at compile time (e.g. if all the classes are known at compile time)
https://gist.github.com/DmytroMitin/6623f1f900330f8341f209e1347a0007
Shapeless - How to derive LabelledGeneric for Coproduct (
KnownSubclasses)Update 4. If we replace
val clazz = classes.headwithclasses.foreach(clazz => ...then issues withNotSerializableExceptioncan be fixed with inlininghttps://gist.github.com/DmytroMitin/c926158a9ff94a6539097c603bbedf6a