Example of usage of a monoid for distributed computation with spark

924 Views Asked by At

I have user hobby data(RDD[Map[String, Int]]) like:

("food" -> 3, "music" -> 1),
("food" -> 2),
("game" -> 5, "twitch" -> 3, "food" -> 3)

I want to calculate stats of them, and represent the stats as Map[String, Array[Int]] while the array size is 5, like:

("food" -> Array(0, 1, 2, 0, 0),
 "music" -> Array(1, 0, 0, 0, 0),
 "game" -> Array(0, 0, 0, 0, 1),
 "twitch" -> Array(0, 0, 1, 0 ,0))

foldLeft seems to be the right solution, but RDD cannot use it, and the data is too big to convert to List/Array to use foldLeft, how could I do this job?

1

There are 1 best solutions below

4
On BEST ANSWER

The trick is to replace the Array in your example by a class that contains the statistic you want for some part of the data, and that can be combined with another instance of the same statistic (covering other part of the data) to provide the statistic on the whole data.

For instance, if you have a statistic that covers the data 3, 3, 2 and 5, I gather it would look something like (0, 1, 2, 0, 1) and if you have another instance covering the data 3,4,4 it would look like (0, 0, 1, 2,0). Now all you have to do is define a + operation that let you combine (0, 1, 2, 0, 1) + (0, 0, 1, 2, 0) = (0,1,3,2,1), covering the data 3,3,2,5 and 3,4,4.

Let's just do that, and call the class StatMonoid:

case class StatMonoid(flags: Seq[Int] = Seq(0,0,0,0,0)) {
    def + (other: StatMonoid) = 
        new StatMonoid( (0 to 4).map{idx => flags(idx) + other.flags(idx)})
}

This class contains the sequence of 5 counters, and define a + operation that let it be combined with other counters.

We also need a convenience method to build it, this could be a constructor in StatMonoid, in the companion object, or just a plain method, as you prefer:

def stat(value: Int): StatMonoid = value match {
    case 1 => new StatMonoid(Seq(1,0,0,0,0))
    case 2 => new StatMonoid(Seq(0,1,0,0,0))
    case 3 => new StatMonoid(Seq(0,0,1,0,0))
    case 4 => new StatMonoid(Seq(0,0,0,1,0))
    case 5 => new StatMonoid(Seq(0,0,0,0,1))
    case _ => throw new RuntimeException("illegal init value: $value")
}

This allows us to easily compute instance of the statistic covering one single piece of data, for example:

scala> stat(4)
res25: StatMonoid = StatMonoid(List(0, 0, 0, 1, 0))

And it also allows us to combine them together simply by adding them:

scala> stat(1) + stat(2) + stat(2) + stat(5) + stat(5) + stat(5)
res18: StatMonoid = StatMonoid(Vector(1, 2, 0, 0, 3))

Now to apply this to your example, let's assume we have the data you mention as an RDD of Map:

val rdd =  sc.parallelize(List(Map("food" -> 3, "music" -> 1), Map("food" -> 2), Map("game" -> 5, "twitch" -> 3, "food" -> 3)))

All we need to do to find the stat for each kind of food, is to flatten the data to get ("foodId" -> id) tuples, transform each id into an instance of StatMonoid above, and finally combine them all together for each kind of food:

import org.apache.spark.rdd.PairRDDFunctions
rdd.flatMap(_.toList).mapValue(stat).reduceByKey(_ + _).collect

Which yields:

res24: Array[(String, StatMonoid)] = Array((game,StatMonoid(List(0, 0, 0, 0, 1))), (twitch,StatMonoid(List(0, 0, 1, 0, 0))), (music,StatMonoid(List(1, 0, 0, 0, 0))), (food,StatMonoid(Vector(0, 1, 2, 0, 0))))

Now, for the side story, if you wonder why I call the class StateMonoid it's simply because... it is a monoid :D, and a very common and handy one, called product . In short, monoids are just thingies that can be combined with each other in associative fashion, they are super common when developing in Spark since they naturally define operations that can be executed in parallel on the distributed slaves, and gathered together into a final result.