I am trying to create some simple custom aggregate operators in Spark using Scala.
I have created a simple hierarchy of operators, with the following super-class:
sealed abstract class Aggregator(val name: String) {
type Key = Row // org.apache.spark.sql.Row
type Value
...
}
I also have a companion object, which constructs the appropriate aggregator each time. Observe that each operator is allowed to specify the Value type it wants.
Now the problem is when I try to call combineByKey
:
val agg = Aggregator("SUM")
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
The error is:
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]
For my needs, Value
can either be a numeric type or a tuple, hence its no bounds definition. If I replace the Value
type declaration with:
type Value = Double
in Aggregator
class, then everything works fine. Therefore, I suppose that the error is relevant to reduceByKey
not knowing the exact Value
type in compile time.
Any ideas on how to get around this?
Your
RDD
cannot be implicitly converted intoPairRDDFunctions
, because all the implicitClassTag
s for keys and values are missing.You might want to include the class tags as implicit parameters in your
Aggregator
:or maybe:
or maybe even:
The last variant would shift the responsibility for providing the
ClassTag
s to the implementor of the abstract class.Now, when using an aggregator
a
of typeAggregator[K, V]
in areduceByKey
, you would have to make sure that those implicitly provided class tags are in the current implicit scope: