So I am new to Scala and just starting to work with RDDs and functional Scala operations.
I am trying to iterate over the values of my Pair RDDs and return Var1 with the average of the values stored in Var2 by applying the defined averagefunction so that the final return is a unique list of Var1 with a single AvgVar2 associated with each one. I am having a lot of trouble figuring out how to iterate over the values.
*edit: I have the following type declarations:
case class ID: Int, Var1: Int, Var2: Int extends Serializable
I have the following function:
def foo(rdds: RDD[(ID, Iterable[(Var1, Var2)])]): RDD[(Var1, AvgVar2)] = {
def average(as: Array[Var2]): AvgVar2 = {
var sum = 0.0
var i = 0.0
while (i < as.length) {
sum += Var2.val
i += 1
}
sum/i
}
//My attempt at Scala
rdds.map(x=> ((x._1),x._2)).groupByKey().map(x=>average(x._1)).collect()
}
My attempt at Scala is trying to do the following:
- split the RDD pair Iterable into key-value pairs of
Var1-Var2. - Group by the key of
Var1and create an array of associatedVar2. - Apply my
averagefunction to each array ofVar2 - Return the
AvgVar2with the associatedVar1as a collection of RDDs
*Edit:
Some sample input data for rdds:
//RDD[(ID,Iterable[(Var1,Var2)...])]
RDD[(1,[(1,3),(1,12),(1,6)])],
RDD[(2,[(2,5),(2,7)])]
Some sample output data:
//RDD[(Var1, AvgVar2)]
RDD[(1,7),(2,6)]
*Edit: Line of working scala code:
rdd.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], average(x._2.map(it => it._2).toArray)))
Considering
ID=Var1, a simple.map()will solve it:Output:
EDITED: (
average()with same signature):