Spark optimize counting occurrences of Strings

2.2k Views Asked by At

I'm taking a string then slicing it so I have just the first letter of that string. I am counting how many times that letter occurs for each of the key's. I have the code that works but I dont think its optimized to the best it can be. I say this because in python I used a function called counter that removed the need to do an additional .groupByKey(), The key by the way is the column number.

val firstLetter = stringRDD.map(x => (x._1, x._2.slice(0,1)))
                    .groupBy(identity).mapValues(_.size)
                    .map(x => (x._1._1, (x._1._2, x._2)))
                    .groupByKey().collect()

At the end my output looks like the following (For instance Key 50, has 4488 strings that start with 0's in it, while key 13 has 4 A's, 1 D and 4483 T's) :

firstLetter: Array[(Int, Iterable[(String, Int)])] = Array(
  (50,CompactBuffer((0,4488))), (0,CompactBuffer((D,4488))), 
  (51,CompactBuffer((X,2), (T,4486))), (32,CompactBuffer((6,4488))), 
  (13,CompactBuffer((A,4), (D,1), (T,4483))), (53,CompactBuffer((2,4488))), 
  (54,CompactBuffer((0,4488))), (14,CompactBuffer((T,4488))),       
  (45,CompactBuffer((A,4), (T,4484))), (47,CompactBuffer((2,4488))),   
  (48,CompactBuffer((0,4488))), (49,CompactBuffer((2,4488))))

Sample Data:

res3: Array[(Int, String)] = Array((0,D), (13,D), (14,T), 
  (32,6393050780099594), (45,T), (47,2013-06-17 12:37:29.954597), (48,0), 
  (49,2013-06-17 12:37:29.954597), (50,0), (51,T), (53,2), 
  (54,078009959499), (0,D), (13,A), (14,T), (32,6393050780099586), (45,A),   
  (47,2013-06-17 12:37:29.718432), (48,0), (49,2013-06-17 12:37:29.718432))

Business Use Case: When I am analyzing millions of records and I am looking at a file that has last names of A-C, and I notice that Column 13 (last name) has a bunch of names that are not A-C I can flag it as something being wrong. At the same token Column 50 is account designation and they always start with 0.

Side note: Can't find much information but what is a compact buffer?

2

There are 2 best solutions below

4
On BEST ANSWER

How does this work for you:

stringRDD.aggregateByKey(Map[Char, Int]())(
  (accum, value) => accum.get(value.head) match {
    case None => accum + (value.head -> 1)
    case Some(count) => accum + (value.head -> (count + 1))
  },
  (accum1, accum2) => accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0))}
)

Oh, and this is a CompactBuffer

0
On

This alternative avoids the first groupByKey in the question, so it should perform better. I don't think we can avoid accumulating the values at the end as -per the question- it's required to have all the letter-count values organized by key.

val letterByKey = rdd.map(elem => ((elem._1, elem._2.head), 1))
val letterCount = letterByKey.reduceByKey(_ + _).map{case ((key,str),count) => (key,(str,count))}
val letterCountByKey = letterCount.groupByKey()