I have a json record like the one below -:
val warningJson = """{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"""
However, I want it to parse to avro data record and passing the scema like -:
val outputSchemaStringTestData = """{"type":"record","name":"Warning","namespace":"test","fields":[{"name":"p1","type":"string"},{"name":"p2","type":"string"},{"name":"p3","type":"string"}]}"""
Here, using this schema when I create a generic record using the following code I am able to do it -:
val genericRecord: GenericRecord = new GenericData.Record(outputSchemaTestData)
genericRecord.put("p1", """{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}""")
genericRecord.put("p2", """{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}""")
genericRecord.put("p3", """{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}""")
However I am using the same schema to parse warningJson
using the code mentioned below by passing the outputSchemaStringTestData
, I am getting an error which is mentioned below -:
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('t' (code 116)): was expecting comma to separate Object entries
at [Source: (String)"{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"; line: 1, column: 11]
org.apache.avro.SchemaParseException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('t' (code 116)): was expecting comma to separate Object entries
at [Source: (String)"{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"; line: 1, column: 11]
My code for conversion-:
package com.Izac.Cep.KafkaSourceAndSink;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import java.io.*;
public class JsonToAvro {
public static GenericRecord createGenericRecord(final String schemastr, final String json) throws Exception {
//Schema.Parser schemaParser = new Schema.Parser();
//Schema schema = schemaParser.parse(schemastr);
Schema schema= Schema.parse(schemastr);
byte[] avroByteArray = fromJasonToAvro(json, schemastr);
DatumReader<GenericRecord> reader1 = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder1 = DecoderFactory.get().binaryDecoder(avroByteArray, null);
GenericRecord result = reader1.read(null, decoder1);
return result;
}
private static byte[] fromJasonToAvro(String json, String schemastr) throws Exception {
Schema schema = Schema.parse(schemastr);
InputStream input = new ByteArrayInputStream(json.getBytes());
DataInputStream din = new DataInputStream(input);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object datum = reader.read(null, decoder);
GenericDatumWriter<Object> w = new GenericDatumWriter<Object>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);
w.write(datum, e);
e.flush();
return outputStream.toByteArray();
}
}