How to properly use groupBy on Spark RDD composed of case class instances?

4.3k Views Asked by At

I am trying to do groupBy on an RDD whose elements are instances of a simple case class and I am getting a weird error that I don't know how to work around. The following code reproduces the problem in the Spark-shell (Spark 0.9.0, Scala 2.10.3, Java 1.7.0 ):

case class EmployeeRec( name : String, position : String, salary : Double ) extends Serializable;
   // I suspect extends Serializable is not needed for case classes, but just in case...

val data = sc.parallelize( Vector( EmployeeRec("Ana", "Analist", 200 ), 
       EmployeeRec("Maria", "Manager", 250.0 ),
       EmployeeRec("Paul", "Director", 300.0 ) ) )

val groupFun = ( emp : EmployeeRec ) => emp.position

val dataByPos = data.groupBy( groupFun )

The resulting error from the last statement is:

val dataByPos = data.groupBy( groupFun )
<console>:21: error: type mismatch;
found   : EmployeeRec => String
required: EmployeeRec => ?
       val dataByPos = data.groupBy( groupFun )

So I tried:

val dataByPos = data.groupBy[String]( groupFun )

The error is a bit more scary now:

val dataByPos = data.groupBy[String]( groupFun )
<console>:18: error: overloaded method value groupBy with alternatives:
 (f: EmployeeRec => String,p: org.apache.spark.Partitioner)(implicit evidence$8:      scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(String, Seq[EmployeeRec])] <and>
 (f: EmployeeRec => String,numPartitions: Int)(implicit evidence$7: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(String, Seq[EmployeeRec])] <and>
 (f: EmployeeRec => String)(implicit evidence$6: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(String, Seq[EmployeeRec])]
 cannot be applied to (EmployeeRec => String)
     val dataByPos = data.groupBy[String]( groupFun )

I tried to be more specific about the version of the overloaded method groupBy that I want to apply by adding the extra argument numPartions = 10 (of course my real dataset is much bigger than just 3 records)

 val dataByPos = data.groupBy[String]( groupFun, 10 )

I get the exact same error as before.

Any ideas? I suspect the issue might be related to the implicit evidence argument... Unfortunately this is one of the areas of scala about which I do not understand much.

Note 1: The analog of this code using tuples instead of case class EmployeeRec, works without any problem. However, I was hoping to be able to use case classes instead of tuples for nicer, more maintable code that doesn't require me to remember or handle fields by position instead of by name (in reality I have many more than 3 fields per employee.)

Note 2: It seems that this issue observed (when using case class EmployeeRec) might be fixed in Spark 1.+, since any of the versions of the code above is correctly compiled by the eclipse scala pluggin when using the spark-core_2.10-1.0.0-cdh5.1.0.jar. However, I am not sure how or whether I will be able to run that version of Spark in the cluster I have access to, and I was hoping to better understand the problem so as to come up with a work-around for Spark 0.9.0

0

There are 0 best solutions below