So i was trying to load the csv file inferring custom schema but everytime i end up with the following errors:

Project_Bank.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [110, 111, 13, 10]

This is how my program looks like and my csv file entries ,

age;job;marital;education;default;balance;housing;loan;contact;day;month;duration;campaign;pdays;previous;poutcome;y 58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no 44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no 33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no

My Code :

$spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext   
import sqlContext.implicits._    
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val bankSchema = StructType(Array(
  StructField("age", IntegerType, true),
  StructField("job", StringType, true),
  StructField("marital", StringType, true),
  StructField("education", StringType, true),
  StructField("default", StringType, true),
  StructField("balance", IntegerType, true),
  StructField("housing", StringType, true),
  StructField("loan", StringType, true),
  StructField("contact", StringType, true),
  StructField("day", IntegerType, true),
  StructField("month", StringType, true),
  StructField("duration", IntegerType, true),
  StructField("campaign", IntegerType, true),
  StructField("pdays", IntegerType, true),
  StructField("previous", IntegerType, true),
  StructField("poutcome", StringType, true),
  StructField("y", StringType, true)))


 val df = sqlContext.
  read.
  schema(bankSchema).
  option("header", "true").
  option("delimiter", ";").
  load("/user/amit.kudnaver_gmail/hadoop/project_bank/Project_Bank.csv").toDF()

  df.registerTempTable("people")
  df.printSchema()
  val distinctage = sqlContext.sql("select distinct age from people")

Any suggestion as why am not able to work with the csv file here after pushing the correct schema. Thanks in advance for your advise.

Thanks Amit K

2

There are 2 best solutions below

2
On BEST ANSWER

Here the problem is Data Frame expects Parquet file while processing it. In order to handle data in CSV. Here what you can do.

First of all, remove the header row from the data.

58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no

Next we write following code to read the data.

Create case class

case class BankSchema(age: Int, job: String, marital:String, education:String, default:String, balance:Int, housing:String, loan:String, contact:String, day:Int, month:String, duration:Int, campaign:Int, pdays:Int, previous:Int, poutcome:String, y:String)

Read data from HDFS and parse it

val bankData = sc.textFile("/user/myuser/Project_Bank.csv").map(_.split(";")).map(p => BankSchema(p(0).toInt, p(1), p(2),p(3),p(4), p(5).toInt, p(6), p(7), p(8), p(9).toInt, p(10), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toInt, p(15), p(16))).toDF()

And then register table and execute queries.

bankData.registerTempTable("bankData")
val distinctage = sqlContext.sql("select distinct age from bankData")

Here is what the output would look like

+---+
|age|
+---+
| 33|
| 44|
| 58|
+---+
0
On

Here the expected file format is csv but as per error its looking for parquet file format.

This can be overcome by explicitly mentioning the file format as below (which was missing in the problem shared) because if we don't specify the file format then it by default expects Parquet format.

As per Java code version (sample example):

Dataset<Row> resultData = session.read().format("csv")
                                            .option("sep", ",")
                                            .option("header", true)
                                            .option("mode", "DROPMALFORMED")
                                            .schema(definedSchema)
                                            .load(inputPath);

Here, schema can be defined either by using a java class (ie. POJO class) or by using StructType as already mentioned. And inputPath is the path of input csv file.