How to have a Json field in AVRO?

1k Views Asked by At

I have a json record like the one below -:

val warningJson = """{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"""

However, I want it to parse to avro data record and passing the scema like -:

val outputSchemaStringTestData = """{"type":"record","name":"Warning","namespace":"test","fields":[{"name":"p1","type":"string"},{"name":"p2","type":"string"},{"name":"p3","type":"string"}]}"""

Here, using this schema when I create a generic record using the following code I am able to do it -:

val genericRecord: GenericRecord = new GenericData.Record(outputSchemaTestData)
 genericRecord.put("p1", """{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}""")
 genericRecord.put("p2", """{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}""")
 genericRecord.put("p3", """{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}""")

However I am using the same schema to parse warningJson using the code mentioned below by passing the outputSchemaStringTestData , I am getting an error which is mentioned below -:

com.fasterxml.jackson.core.JsonParseException: Unexpected character ('t' (code 116)): was expecting comma to separate Object entries
 at [Source: (String)"{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"; line: 1, column: 11]
org.apache.avro.SchemaParseException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('t' (code 116)): was expecting comma to separate Object entries
 at [Source: (String)"{"p1":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499008876", "amount": 6094, "state": "SUCCESS"}","p2":"{"trasanction_id": 197, "customer_id": 27, "datetime": "1576499017565", "amount": 547, "state": "SUCCESS"}","p3":"{"trasanction_id": 198, "customer_id": 27, "datetime": "1576499029116", "amount": 6824, "state": "SUCCESS"}"}"; line: 1, column: 11]

My code for conversion-:

package com.Izac.Cep.KafkaSourceAndSink;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;

import java.io.*;

public class JsonToAvro {
    public static GenericRecord createGenericRecord(final String schemastr, final String json) throws Exception {
        //Schema.Parser schemaParser = new Schema.Parser();
        //Schema schema = schemaParser.parse(schemastr);
        Schema schema= Schema.parse(schemastr);
        byte[] avroByteArray = fromJasonToAvro(json, schemastr);

        DatumReader<GenericRecord> reader1 = new GenericDatumReader<GenericRecord>(schema);

        Decoder decoder1 = DecoderFactory.get().binaryDecoder(avroByteArray, null);
        GenericRecord result = reader1.read(null, decoder1);
        return result;

    }

    private static byte[] fromJasonToAvro(String json, String schemastr) throws Exception {

        Schema schema = Schema.parse(schemastr);
        InputStream input = new ByteArrayInputStream(json.getBytes());
        DataInputStream din = new DataInputStream(input);


        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

        DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
        Object datum = reader.read(null, decoder);


        GenericDatumWriter<Object> w = new GenericDatumWriter<Object>(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

        w.write(datum, e);
        e.flush();

        return outputStream.toByteArray();
    }
}
0

There are 0 best solutions below