Need to perform the following join operation in spark
JavaPairRDD<String, Tuple2<Optional<MarkToMarketPNL>, Optional<MarkToMarketPNL>>> finalMTMPNLRDD = openMTMPNL.fullOuterJoin(closedMTMPNL);
To perform this operation i need two JavaPairRDD which are closedMTMPNL and openMTMPNL. OpenMTM and closeMTM are working fine but keyBy on both RDD are giving error at runtime.
JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){
public String call(MarkToMarketPNL mtm) throws Exception
{
return mtm.getTaxlot();
}
});
JavaPairRDD<String,MarkToMarketPNL> closedMTMPNL = closedMTM.keyBy(new Function<MarkToMarketPNL,String>(){
public String call(MarkToMarketPNL mtm) throws Exception
{
return mtm.getTaxlot();
}
});
Is there any other way in which i can join openMTM and closeMTM RDD's? As of now trying to get two RDD's on which the join can be performed on String. What causing the exception to occur??
Attaching the stack trace
java.lang.NullPointerException
15/06/28 01:19:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
15/06/28 01:19:30 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
I think the error is not in the code that you included in the question. Spark is trying to run
counton an RDD. The code you included does not callcount, so that's one sign. But the exception suggests that the RDD being counted has an iterator that was created in Java and is now being converted to a Scala iterator. At that point it turns out this iterator is in factnull.Does your code produce an iterator somewhere? Perhaps in a
mapPartitionscall or some such?