I am trying to create some simple custom aggregate operators in Spark using Scala.
I have created a simple hierarchy of operators, with the following super-class:
sealed abstract class Aggregator(val name: String) {
type Key = Row // org.apache.spark.sql.Row
type Value
...
}
I also have a companion object, which constructs the appropriate aggregator each time. Observe that each operator is allowed to specify the Value type it wants.
Now the problem is when I try to call combineByKey:
val agg = Aggregator("SUM")
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
The error is:
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]
For my needs, Value can either be a numeric type or a tuple, hence its no bounds definition. If I replace the Value type declaration with:
type Value = Double
in Aggregator class, then everything works fine. Therefore, I suppose that the error is relevant to reduceByKey not knowing the exact Value type in compile time.
Any ideas on how to get around this?
Your
RDDcannot be implicitly converted intoPairRDDFunctions, because all the implicitClassTags for keys and values are missing.You might want to include the class tags as implicit parameters in your
Aggregator:or maybe:
or maybe even:
The last variant would shift the responsibility for providing the
ClassTags to the implementor of the abstract class.Now, when using an aggregator
aof typeAggregator[K, V]in areduceByKey, you would have to make sure that those implicitly provided class tags are in the current implicit scope: