Spark-Sql Context having dataframe cannot save file in parquet file Due to Deserialazation error

955 Views Asked by At

Hi I am reading parquet files in spark context and then merging schema of various parquet files. I am then saving the dataframe returned from read parquet function and saving using write.parquet command.

Here is the code: 

    options:Map[String,String] = Map[String,String]()
    val options2 = options.+("mergeSchema" -> "true")
    df = sqc.read.options(options2).schema(schemaObj).parquet(paths:_*)
    df.printSchema()
    df.coalesce(1).write.parquet("/home/abc/")

Here schemaObj is of Type Parquet[StructType] which was converted from AvroSchema

My sbt is as following: 

    "org.apache.spark"     % "spark-core_2.10"          % "1.6.0" %provided",
    "org.apache.spark"     % "spark-sql_2.10"                % "1.6.0",
    "com.amazonaws"        % "aws-java-sdk"                  % "1.9.27",
    "com.databricks"       % "spark-avro_2.10"               % "2.0.1",
    "org.apache.avro"      % "avro"                          % "1.7.6",
    "io.confluent"         % "kafka-avro-serializer"         % "1.0",
    "mysql"                % "mysql-connector-java"          % "5.1.6",
    "io.confluent"         %"kafka-schema-registry-client"   % "2.0.1",
    "com.twitter"            %"parquet-avro"                 % "1.6.0"



I am getting following error: 

    Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
     at [Source: {"id":"0","name":"ExecutedCommand"}; line: 1, column: 1]
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
        at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
        at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
        at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
        at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
        at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
        at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
        at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
        at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3668)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3560)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2580)
        at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:85)
        at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1616)
        at org.apache.spark.rdd.RDD$$anonfun$34.apply(RDD.scala:1616)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.rdd.RDD.<init>(RDD.scala:1616)
        at org.apache.spark.rdd.SqlNewHadoopRDD.<init>(SqlNewHadoopRDD.scala:64)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1.<init>(ParquetRelation.scala:327)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1.apply(ParquetRelation.scala:327)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1.apply(ParquetRelation.scala:327)
        at org.apache.spark.util.Utils$.withDummyCallSite(Utils.scala:2165)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.buildInternalScan(ParquetRelation.scala:326)
        at org.apache.spark.sql.sources.HadoopFsRelation.buildInternalScan(interfaces.scala:661)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:113)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:113)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:109)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
        at org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:1837)
1

There are 1 best solutions below

0
On

I was able to resolve the issue by adding the following dependency

dependencyOverrides ++= Set( "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4" )

Not sure but maybe the parquet-mr uses the older verison of jackson bind. The library was used to convert AvroSchema to ParquetSchema