I am trying to create new RDD based on given PairRDD. I have a PairRDD with few keys but each keys have large (about 100k) values. I want to somehow repartition, make each Iterable<v>
into RDD[v] so that I can further apply map, reduce, sortBy etc effectively on those values. I am sensing flatMapValues is my friend but want to check with other sparkens. This is for real-time spark app. I have already tried collect() and computing all measures in-memory of app server but trying to improve upon it.
This is what I try (psuedo)
class ComputeMetrices{
transient JavaSparkContext sparkContext;
/**
* This method compute 3 measures: 2 percentiles of different values and 1 histogram
* @param javaPairRdd
* @return
*/
public Map<String, MetricsSummary> computeMetrices(JavaPairRDD<String, InputData> javaPairRdd) {
JavaPairRDD<String, MetricsSummary> rdd = javaPairRdd.groupByKey(10).mapValues(itr => {
MetricsSummary ms = new MetricsSummary();
List<Double> list1
List<Double> list2
itr.foreach{ list1.add(itr._2.height); list2.add(itr._2.weight)}
//Here I want to convert above lists into RDD
JavaRDD<V> javaRdd1 = sparContext.parallelize(list1) //null pointer ; probably at sparkContext
JavaRDD<V> javaRdd2 = sparContext.parallelize(list2)
JavaPairRDD1 javaPairRdd1 = javaRdd1.sortBy.zipWithIndex()
JavaPairRDD2 javaPairRdd2 = javaRdd2.sortBy.zipWithIndex()
//Above two PairRDD will be used further to find Percentile values for range of (0..100)
//Not writing percentile algo for sake of brevity
double[] percentile1 = //computed from javaPairRdd1
double[] percentile2 = //computed from javaPairRdd2
ms.percentile1(percentile1)
ms.percentile2(percentile2)
//compute histogram
JavaDoubleRDD dRdd = sparkContext.parallelizeDoubles(list1)
long[] hist = dRdd.histogram(10)
ms.histo(hist)
return ms
})
return rdd.collectAsMap
}
}
I want to create RDD out of that Iterable from groupByKey result so that I can user further spark transformations.
The reason why sparContext is null is that the code inside your mapValues is executed on a worker - there are no sparContext available on a worker, it is available only on driver.
If I understand your code I can tell that there is no need to create if you want mapValues to produce sorted and indexes pairs.
Please keep in mind that result of that code would look like:
i.e.
mapValues is applied to every V inside of grouped List independently. So counter will be always 0.
If you want to convert emit multiple RDDs out of single RDD with K, List(V) than flatMapValues will help you. There is still question - how efficient will be streaming operations over new rdd - map and reduce will work for sure, but sortBy would depend on the size of your window.