my accumulator is an Array[Array[Int]] after updating accumalutor in foreach operation of RDD,accumulator(0) is as expected where as accumulator(1) is Array(0,0,0) which is completely lost
inside RDD ,accumulator value is Array(Array(4,5,6),Array(4,5,6)) outside RDD, accumulator value is Array(Array(4,5,6),Array(0,0,0))
below is the code
import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object acc {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val a =Array(Array(1,2,3),Array(4,5,6))
val rdd = sc.parallelize(a)
val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
rdd.foreach{x=>
accumulator += (x(0),0,0)
accumulator += (x(1),0,1)
accumulator += (x(2),0,2)
accumulator += (x(0),1,0)
accumulator += (x(1),1,1)
accumulator += (x(2),1,2)
println("accumulator value in rdd is"+accumulator.localValue)
}
println("accumulator value out of rdd is :" + accumulator.value )
}
}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int, Int)] {
def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
initialValue
}
def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {
acc(value._2)(value._3) = value._1
acc
}
def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
val columnLength: Int = m1.length
val rowLength: Int = m1(0).length
var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)
var j: Int = 0
while (j < columnLength) {
var i =0
while (i < rowLength) {
val a = Math.max(m1(j)(i), m2(j)(i))
updatedMatrix(j)(i) = a
i += 1
}
j += 1
}
updatedMatrix
}
}
results: inside RDD ,accumalator value is Array(Array(4,5,6),Array(4,5,6)) outside RDD, accumalator value is Array(Array(4,5,6),Array(0,0,0))
but what i'm expecting outside RDD is Array(Array(4,5,6),Array(4,5,6))
addAccumulator method is called whenever there is an update to the accumulator.variable
In the above code accumulator += (x(0),0,0) invokes the addAccumulator method.
once all the tasks are completed addInPlace method is called to aggregate the accumulated values from all the tasks.
In the above code initialValue Array(1, 1, 1)Array(1, 1, 1) and task Accumulator value Array(4, 5, 6) Array(4, 5, 6) invokes the addInPlace method.
In the above code variable i in addInPlace method has to be reset whenever it enters the loop while (j < columnLength) {
Following code works like a charm.