Spark: Work around nested RDD

5.7k Views Asked by At

There are two tables. First table has records with two fields book1 and book2. These are id's of books that usualy are read together, in pairs. Second table has columns books and readers of these books, where books and readers are book and reader IDs, respectively. For every reader in the second table I need to find corresponding books in the pairs table. For example if reader read books 1,2,3 and we have pairs (1,7), (6,2), (4,10) the resulting list for this reader should have books 7,6.

I first group books by readers and then iterate pairs. Every book in pair I try to match with all books in a user list:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._


object Simple {

  case class Pair(book1: Int, book2: Int)
  case class Book(book: Int, reader: Int, name:String)

  val pairs = Array(
    Pair(1, 2),
    Pair(1, 3),
    Pair(5, 7)
  )

  val testRecs = Array(
    Book(book = 1, reader = 710, name = "book1"),
    Book(book = 2, reader = 710, name = "book2"),
    Book(book = 3, reader = 710, name = "book3"),
    Book(book = 8, reader = 710, name = "book8"),
    Book(book = 1, reader = 720, name = "book1"),
    Book(book = 2, reader = 720, name = "book2"),
    Book(book = 8, reader = 720, name = "book8"),
    Book(book = 3, reader = 730, name = "book3"),
    Book(book = 8, reader = 740, name = "book8")
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Simple")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val pairsDf = sc.parallelize(pairs).toDF()
    val testData = sc.parallelize(testRecs)

    // *** Group test data by reader
    val testByReader = testData.map(r => (r.reader, r.book))
    val testGroups = testByReader.groupByKey()
    val x = testGroups.map(tuple => tuple match {
      case(user, bookIter) => matchList(user,pairsDf, bookIter.toList)
    })
    x.foreach(println)
  }

  def matchList(user:Int, df: DataFrame, toMatch: List[Int]) = {
    //val x = df.map(r => (r(0), r(1))) --- This also fails!!
    //x
    val relatedBooks = df.map(r => {
      val book1 = r(0)
      val book2 = r(1)
      val z = toMatch.map(book =>
        if (book == book1)
          List(book2)
        else {
          if (book == book2) List(book1)
          else List()
        } //if
      )
      z.flatMap(identity)
    })
    (user,relatedBooks)
  }
}

This results in java.lang.NullPointerException (below). As I understand, Spark does not support nested RDDs. Please advise on another way to solve this task.

...
15/06/09 18:59:25 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:25 INFO AbstractConnector: Started [email protected]:44837
15/06/09 18:59:26 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:26 INFO AbstractConnector: Started [email protected]:4040
[Stage 0:>                                                          (0 + 0) / 5]15/06/09 18:59:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 5)
java.lang.NullPointerException
    at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:253)
    at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:961)
    at org.apache.spark.sql.DataFrame.map(DataFrame.scala:848)
    at Simple$.matchList(Simple.scala:60)
    at Simple$$anonfun$2.apply(Simple.scala:52)
    at Simple$$anonfun$2.apply(Simple.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
2

There are 2 best solutions below

3
On BEST ANSWER

You can create two rdds . One for bookpair and one for readerbook and then join the two rdds by bookid.

val bookpair = Array((1,2),(2,4),(3,4),(5,6),(4,6),(7,3))
val bookpairRdd = sc.parallelize(bookpair)
val readerbook = Array(("foo",1),("bar",2),("user1",3),("user3",4))
val readerRdd = sc.parallelize(readerbook).map(x => x.swap)
val joinedRdd = readerRdd.join(bookpairRdd)
joinedRdd.foreach(println)

(4,(user3,6))
(3,(user1,4))
(2,(bar,4))
(1,(foo,2))
1
On

As you've noticed, we can't nest RDDs. One option would be to emit book-user pairs, then join that with the book info, and then group the results by user id (grouping by key is a bit sketchy, but assuming no user has read so many books that the book info for that user doesn't fit in memory it should be ok).