Creating RDD from sequence of GenericRecord in spark will change field values in generic record

862 Views Asked by At

When I create RDD from GenericRecords (avro), immiediately collect it and print those records I am receiving wrong field values - modified in strange way: all values of the field has value equal to the first field prior to schema i.e

def createGenericRecord(first: String, second: String) = {
      val schemaString =
      """
        |{
        | "type": "record",
        | "name": "test_schema",
        | "fields":[
          | { "name": "test_field1", "type": "string" },
          | { "name": "test_field2", "type": ["null", "string"] }
        |]
        |}
      """.stripMargin
    val parser = new Schema.Parser()
    parser.setValidate(true)
    parser.setValidateDefaults(true)
    val schema = parser.parse(schemaString);
    val genericRecord = new Record(schema)
    genericRecord.put("test_field1", first)
    genericRecord.put("test_field2", second)
    genericRecord
}

val record1 = createGenericRecord("test1","test2")
val record2 = createGenericRecord("test3","test4")

println(record1)//prints {"test_field1": "test1", "test_field2": "test2"}
println(record2)//prints {"test_field1": "test3", "test_field2": "test4"} 

val t = sc.makeRDD(Seq(record1, record2))
val collected = t.collect()
println(collected(0))//prints {"test_field1": "test1", "test_field2": "test1"}
println(collected(1))//prints {"test_field1": "test3", "test_field2": "test3"}          

I am using spark 1.2.0 with spark.serialiazier configured to org.apache.spark.serializer.KryoSerializer

1

There are 1 best solutions below

0
On

The solution for this problem is to update arg.apache.avro % avro dependency to the version 1.7.7.