scala + spark: serialize a task for recommendations

492 Views Asked by At

I've been working with Scala + Spark and the Movie Recommendation with MLib tutorial.

After obtaining my predictions I need the top 3 items per user.

val predictions = 
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
  (user, product, rate)
}

I've tried this:

def myPrint(x:(Int, Int, Double)) = println(x._1 + ":" + x._2 + " - " +x._3)

predictions.collect().sortBy(- _._3).groupBy(_._1).foreach( t2 => t2._2.take(3).foreach(myPrint) )

( _.1 is user, _.2 is item, _.3 is rate)

I had to add the "collect()" method to make it work, but I can't serialize this task. By the way, I added the myPrint method because I don't know how to obtain a collection or map from the last line.

Any idea to make it serializable?

Any idea to get a collection/map from last line?

If I can't do it better, in myPrint I will write in a database and make commit after 1000 insert.

Thanks.

3

There are 3 best solutions below

0
On

After read the answer of lmm and do some research I resolved my problem this way:

First, I begun to work with a Rating object instead of Tuples:

val predictions = model.predict(usersProducts)

Then I defined the function value as follows, now I do here the "take":

def myPrint: ((Int, Iterable[Rating])) => Unit = x => x._2.take(3).foreach(println)

So, now I mix everything this way:

predictions.sortBy(- _.rating).groupBy(_.user).foreach(myPrint)
4
On

A task that calls a method has to serialize the object containing the method. Try using a function value instead:

val myPrint: ((Int, Int, Double)) => Unit = x => ...

You don't want the collect() at the start, that defeats the whole point of using Spark.

I don't understand what you're saying about "get a collection/map". .take(3) already returns a collection.

0
On

You could make sure that all the computations are done in RDDs by slightly modifying your approach:

predictions.sortBy(- _.rating).groupBy(_.user)
  .flatMap(_._2.take(3)).foreach(println)