here is my code:
val bg = imageBundleRDD.first() //bg:[Text, BundleWritable]
val res= imageBundleRDD.map(data => {
val desBundle = colorToGray(bg._2) //lineA:NotSerializableException: org.apache.hadoop.io.Text
//val desBundle = colorToGray(data._2) //lineB:everything is ok
(data._1, desBundle)
})
println(res.count)
lineB goes well but lineA shows that:org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.io.Text
I try to use use Kryo to solve my problem but it seems nothing has been changed:
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Text])
kryo.register(classOf[BundleWritable])
}
}
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...
Thanks!!!
In Apache Spark while dealing with Sequence files, we have to follow these techniques:
-- Use Java equivalent Data Types in place of Hadoop data types. -- Spark Automatically converts the Writables into Java equivalent Types. Ex:- We have a sequence file "xyz", here key type is say Text and value is LongWritable. When we use this file to create an RDD, we need use their java equivalent data types i.e., String and Long respectively. val mydata = = sc.sequenceFile[String, Long]("path/to/xyz") mydata.collect