apache spark - creating RDD from Iterable from groupByKey results

3.8k Views Asked by At

I am trying to create new RDD based on given PairRDD. I have a PairRDD with few keys but each keys have large (about 100k) values. I want to somehow repartition, make each Iterable<v> into RDD[v] so that I can further apply map, reduce, sortBy etc effectively on those values. I am sensing flatMapValues is my friend but want to check with other sparkens. This is for real-time spark app. I have already tried collect() and computing all measures in-memory of app server but trying to improve upon it. This is what I try (psuedo)

class ComputeMetrices{

    transient JavaSparkContext sparkContext;

    /**
     * This method compute 3 measures: 2 percentiles of different values and 1 histogram 
     * @param javaPairRdd
     * @return
     */
    public Map<String, MetricsSummary> computeMetrices(JavaPairRDD<String, InputData> javaPairRdd) {

      JavaPairRDD<String, MetricsSummary> rdd = javaPairRdd.groupByKey(10).mapValues(itr => {

      MetricsSummary ms = new MetricsSummary();

      List<Double> list1 
      List<Double> list2

      itr.foreach{ list1.add(itr._2.height); list2.add(itr._2.weight)}
       //Here I want to convert above lists into RDD 
      JavaRDD<V> javaRdd1 = sparContext.parallelize(list1) //null pointer ; probably at sparkContext
      JavaRDD<V> javaRdd2 = sparContext.parallelize(list2)
      JavaPairRDD1 javaPairRdd1 = javaRdd1.sortBy.zipWithIndex()
      JavaPairRDD2 javaPairRdd2 = javaRdd2.sortBy.zipWithIndex()
      //Above two PairRDD will be used further to find Percentile values for range of (0..100)
      //Not writing percentile algo for sake of brevity
      double[] percentile1 = //computed from javaPairRdd1
      double[] percentile2 = //computed from javaPairRdd2
      ms.percentile1(percentile1)
      ms.percentile2(percentile2)
      //compute histogram
      JavaDoubleRDD dRdd = sparkContext.parallelizeDoubles(list1)
      long[] hist = dRdd.histogram(10)
      ms.histo(hist)
      return ms
      })
      return rdd.collectAsMap
    }
}

I want to create RDD out of that Iterable from groupByKey result so that I can user further spark transformations.

1

There are 1 best solutions below

1
On

The reason why sparContext is null is that the code inside your mapValues is executed on a worker - there are no sparContext available on a worker, it is available only on driver.

If I understand your code I can tell that there is no need to create if you want mapValues to produce sorted and indexes pairs.

Please keep in mind that result of that code would look like:

RDD(String, V) ->groupByKey-> RDD(String, List(V)) 
->mapValues-> RDD(String, List(Int,V))

i.e.

key1, List((0,V1), (0,V2)
key1, List((0,V1), (0,V2)

mapValues is applied to every V inside of grouped List independently. So counter will be always 0.

If you want to convert emit multiple RDDs out of single RDD with K, List(V) than flatMapValues will help you. There is still question - how efficient will be streaming operations over new rdd - map and reduce will work for sure, but sortBy would depend on the size of your window.

RDD(K, List(V)) -> flatMapValues(x=>x) -> RDD((K, V1), (K, V2) ... )