When I create RDD from GenericRecords (avro), immiediately collect it and print those records I am receiving wrong field values - modified in strange way: all values of the field has value equal to the first field prior to schema i.e
def createGenericRecord(first: String, second: String) = {
val schemaString =
"""
|{
| "type": "record",
| "name": "test_schema",
| "fields":[
| { "name": "test_field1", "type": "string" },
| { "name": "test_field2", "type": ["null", "string"] }
|]
|}
""".stripMargin
val parser = new Schema.Parser()
parser.setValidate(true)
parser.setValidateDefaults(true)
val schema = parser.parse(schemaString);
val genericRecord = new Record(schema)
genericRecord.put("test_field1", first)
genericRecord.put("test_field2", second)
genericRecord
}
val record1 = createGenericRecord("test1","test2")
val record2 = createGenericRecord("test3","test4")
println(record1)//prints {"test_field1": "test1", "test_field2": "test2"}
println(record2)//prints {"test_field1": "test3", "test_field2": "test4"}
val t = sc.makeRDD(Seq(record1, record2))
val collected = t.collect()
println(collected(0))//prints {"test_field1": "test1", "test_field2": "test1"}
println(collected(1))//prints {"test_field1": "test3", "test_field2": "test3"}
I am using spark 1.2.0 with spark.serialiazier configured to org.apache.spark.serializer.KryoSerializer
The solution for this problem is to update
arg.apache.avro % avro
dependency to the version 1.7.7.