unable to create dataframe from sequence file in Spark created by Sqoop

1.8k Views Asked by At

I want to read orders data and create RDD out of it which is stored as sequence file in hadoop fs in cloudera vm. Below are my steps:

1) Importing orders data as sequence file:

sqoop import --connect jdbc:mysql://localhost/retail_db --username retail_dba --password cloudera  --table orders -m 1 --target-dir /ordersDataSet --as-sequencefile   

2) Reading file in spark scala:

Spark 1.6

val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec => rec.toString())  

3) When I try to read data from above RDD it throws below error:

Caused by: java.io.IOException: WritableName can't load class: orders
    at org.apache.hadoop.io.WritableName.getClass(WritableName.java:77)
    at org.apache.hadoop.io.SequenceFile$Reader.getValueClass(SequenceFile.java:2108)
    ... 17 more
Caused by: java.lang.ClassNotFoundException: Class orders not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2185)
    at org.apache.hadoop.io.WritableName.getClass(WritableName.java:75)
    ... 18 more

I don't know why it says that it can't find orders. Where am I going wrong ?

I referred codes from these two links as well but no luck:
1) Refer sequence part
2) Refer step no. 8

2

There are 2 best solutions below

2
On BEST ANSWER

I figured out the solution to my own problem. Well, I am going to write a lengthy solution but I hope it will make some sense.

1) When I tried to read the data which was imported in HDFS using SQOOP, it gives an error because of following reasons:

A) Sequence file is all about key-value pair. So when I import it using sqoop, the data which is imported it is not in key value pair that is why while reading it throws an error.
B) If you try to read few characters from which you can figure out the two classes required for passing as input while reading sequence file you ll get data as below:

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/pa* | head -c 300
SEQ!org.apache.hadoop.io.LongWritableorders�;�M��c�K�����@���-OCLOSED@���PENDING_PAYMENT@���/COMPLETE@���"{CLOSED@���cat: Unable to write to output stream.  

Above you can see only one class i.e org.apache.hadoop.io.LongWritable and when I pass this while reading the sequence data it throws an error which is mentioned in the post.

val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.LongWritable]).map(rec => rec.toString())  

I don't think that the B point is the main reason of that error but I am very much sure that A point is the real culprit of that error.

2) Below is the way how I solved my problem.

I imported data as avro data file in other destination using SQOOP. Then I created the dataframe from avro using below ways:

scala> import com.databricks.spark.avro._;
scala> val avroData=sqlContext.read.avro("path")  

Now I created key-value pair and saved it as sequence file

avroData.map(p=>(p(0).toString,(p(0)+"\t"+p(1)+"\t"+p(2)+"\t"+p(3)))).saveAsSequenceFile("/user/cloudera/problem5/sequence")  

Now when I try to read few characters of the above written file it gives me two classes which I need while reading the file as below:

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/part-00000 | head -c 300
SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text^#%���8P���11  1374735600000   11599   CLOSED&2#2  1374735600000   256 PENDING_PAYMENT!33  1374735600000   12111   COMPLETE44  1374735600000   8827    CLOSED!55   1374735600000   11318   COMPLETE 66 1374cat: Unable to write to output stream.  

scala> val sequenceData=sc.sequenceFile("/user/cloudera/problem5/sequence",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec=>rec.toString)
sequenceData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at map at <console>:30

Now when I try to print data it displays data as below:

scala> sequenceData.take(4).foreach(println)
(1,1    1374735600000   11599   CLOSED)
(2,2    1374735600000   256 PENDING_PAYMENT)
(3,3    1374735600000   12111   COMPLETE)
(4,4    1374735600000   8827    CLOSED)

Last but not the least, Thank you everyone for your much appreciated efforts. Cheers!!

0
On

The sqoop has little to do with it, here is an example of a more realistic scenario, whereby saveAsSequenceFile always assumes k, v pairs - this may help you:

import org.apache.hadoop.io._

val RDD = sc.parallelize( List( (1, List("A", "B")) , (2, List("B", "C")) , (3, List("C", "D", "E")) ) )
val RDD2 = RDD.map(x => (x._1, x._2.mkString("/")))
RDD2.saveAsSequenceFile("/rushhour/seq-directory/2")

val sequence_data = sc.sequenceFile("/rushhour/seq-directory/*", classOf[IntWritable], classOf[Text])
                  .map{case (x, y) => (x.get(), y.toString().split("/")(0), y.toString().split("/")(1))}

sequence_data.collect

returns:

res20: Array[(Int, String, String)] = Array((1,A,B), (2,B,C), (3,C,D), (1,A,B), (2,B,C), (3,C,D))

I am not sure if you want an RDD or DF, but converting RDD to DF is of course trivial.