How can i exclude avro namespace when converting to json

2.3k Views Asked by At

Recently i am writing the mapper for pushing data to elasticsearch. My input is avro object where i am trying to convert to Json. Every thing is fine but i am getting namespace in json, where elasticsearch wont allow namespace as key objects.

"requestobj":{"com.nw.data.Request":{"event_id":null,"event_epoch":-1,"event_dispatch_epoch":-1,"server_epoch":1471852915279,"date":{"string":"2016-08-22"},"time":{"string":"08:01:55"},"req_source":{"string":"app"},"req_channel":{"string":"Mobile"},"req_dimension":{"string":"1312x704"}

is there a way to exclude namespace - com.nw.data.Request

I am using following code to convert avro to json:

public static String getJsonString(GenericRecord record) throws IOException {
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), os);
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema());
        writer.setSchema(record.getSchema());
        writer.write(record, encoder);
        encoder.flush();
        String jsonString = new String(os.toByteArray(), Charset.forName("UTF-8"));
        os.close();

        return jsonString;
    }
3

There are 3 best solutions below

0
On

Following Juan's PR being merged, you can now use
encoder.setIncludeNamespace(false);

0
On

You could too override JsonEncoder:

    import java.io.{ByteArrayOutputStream, IOException, OutputStream}
    import java.nio.ByteBuffer
    import java.util

    import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter}
    import org.apache.avro.io.{DecoderFactory, ParsingEncoder}
    import org.apache.avro.io.parsing.{JsonGrammarGenerator, Parser, Symbol}
    import org.apache.avro.util.Utf8
    import org.apache.avro.{AvroTypeException, Schema}
    import org.codehaus.jackson.util.{DefaultPrettyPrinter, MinimalPrettyPrinter}
    import org.codehaus.jackson.{JsonEncoding, JsonFactory, JsonGenerator}


    object inJson{

      private def toObject(binary: Array[Byte], schemaWriter: Schema)(schemaReader : Schema = schemaWriter) : Object = {
        val decoder = DecoderFactory.get().binaryDecoder(binary, null)
        val reader = new GenericDatumReader[Object](schemaWriter, schemaReader)
        reader.read(null, decoder)
      }

      private def toJson(`object`: Object,schemaWriter: Schema): String = {
        val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream()
        try {
          val encoder = new JsonEncoder(schemaWriter, outputStream, true)
          val writer = new GenericDatumWriter[Object](schemaWriter)
          writer.write(`object`, encoder)
          encoder.flush()
          outputStream.toString
        } finally {
          outputStream.close()
        }
      }

      def ~>(binary: Array[Byte], schemaWriter: Schema, schemaReader : Schema, schemaJson : Schema) : String = {
        Option(toObject(binary, schemaWriter)(schemaReader)).map(toJson(_, schemaJson)).getOrElse("")
      }

      def ~>(`object`: Object, schemaWriter: Schema, schemaReader : Schema, schemaJson : Schema) : String = {
        toJson(`object`, schemaJson)
      }
    }

    object JsonEncoder{
      private val LINE_SEPARATOR: String = System.getProperty("line.separator")

      // by default, one object per line.
      // with pretty option use default pretty printer with root line separator.
      @throws[IOException]
      private def getJsonGenerator(out: OutputStream, pretty: Boolean): JsonGenerator = {
        if (null == out) throw new NullPointerException("OutputStream cannot be null")
        val g: JsonGenerator = new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8)
        if (pretty) {
          val pp: DefaultPrettyPrinter = new DefaultPrettyPrinter() { //@Override
            @throws[IOException]
            override def writeRootValueSeparator(jg: JsonGenerator): Unit = {
           //   jg.writeRaw(LINE_SEPARATOR)
            }
          }
          g.setPrettyPrinter(pp)
        }
        else {
          val pp: MinimalPrettyPrinter = new MinimalPrettyPrinter
          pp.setRootValueSeparator(LINE_SEPARATOR)
          g.setPrettyPrinter(pp)
        }
        g
      }
    }

    class JsonEncoder(schema : Schema, out: JsonGenerator) extends ParsingEncoder with Parser.ActionHandler{

      val parser = new Parser(new JsonGrammarGenerator().generate(schema), this)
      /**
        * Has anything been written into the collections?
        */
      protected var isEmpty: util.BitSet = new util.BitSet

      def this(sc: Schema, out: OutputStream) {
        this(sc, JsonEncoder.getJsonGenerator(out, false))
      }

      def this(sc: Schema, out: OutputStream, pretty: Boolean) {
        this(sc, JsonEncoder.getJsonGenerator(out, pretty))
      }


      @throws[IOException]
      override def writeIndex(unionIndex: Int): Unit = {
        parser.advance(Symbol.UNION)
        val top: Symbol.Alternative = parser.popSymbol.asInstanceOf[Symbol.Alternative]
        val symbol: Symbol = top.getSymbol(unionIndex)
        parser.pushSymbol(symbol)
      }

      @throws[IOException]
      override def flush(): Unit = {
        parser.processImplicitActions()
        if (out != null) out.flush()
      }

      @throws[IOException]
      override def writeNull(): Unit = {
        parser.advance(Symbol.NULL)
        out.writeNull()
      }

      @throws[IOException]
      override def writeBoolean(b: Boolean): Unit = {
        parser.advance(Symbol.BOOLEAN)
        out.writeBoolean(b)
      }

      @throws[IOException]
      override def writeInt(n: Int): Unit = {
        parser.advance(Symbol.INT)
        out.writeNumber(n)
      }

      @throws[IOException]
      override def writeLong(n: Long): Unit = {
        parser.advance(Symbol.LONG)
        out.writeNumber(n)
      }

      @throws[IOException]
      override def writeFloat(f: Float): Unit = {
        parser.advance(Symbol.FLOAT)
        out.writeNumber(f)
      }

      @throws[IOException]
      override def writeDouble(d: Double): Unit = {
        parser.advance(Symbol.DOUBLE)
        out.writeNumber(d)
      }

      @throws[IOException]
      override def writeString(utf8: Utf8): Unit = {
        writeString(utf8.toString)
      }

      @throws[IOException]
      override def writeString(str: String): Unit = {
        parser.advance(Symbol.STRING)
        if (parser.topSymbol eq Symbol.MAP_KEY_MARKER) {
          parser.advance(Symbol.MAP_KEY_MARKER)
          out.writeFieldName(str)
        }
        else out.writeString(str)
      }

      @throws[IOException]
      override def writeBytes(bytes: ByteBuffer): Unit = {
        if (bytes.hasArray) writeBytes(bytes.array, bytes.position, bytes.remaining)
        else {
          val b: Array[Byte] = new Array[Byte](bytes.remaining)
          bytes.duplicate.get(b)
          writeBytes(b)
        }
      }

      @throws[IOException]
      override def writeBytes(bytes: Array[Byte], start: Int, len: Int): Unit = {
        parser.advance(Symbol.BYTES)
        writeByteArray(bytes, start, len)
      }

      @throws[IOException]
      private def writeByteArray(bytes: Array[Byte], start: Int, len: Int): Unit = {
        out.writeString(new String(bytes, start, len, "ISO-8859-1"))
      }

      @throws[IOException]
      override def writeFixed(bytes: Array[Byte], start: Int, len: Int): Unit = {
        parser.advance(Symbol.FIXED)
        val top: Symbol.IntCheckAction = parser.popSymbol.asInstanceOf[Symbol.IntCheckAction]
        if (len != top.size) throw new AvroTypeException("Incorrect length for fixed binary: expected " + top.size + " but received " + len + " bytes.")
        writeByteArray(bytes, start, len)
      }

      @throws[IOException]
      override def writeEnum(e: Int): Unit = {
        parser.advance(Symbol.ENUM)
        val top: Symbol.EnumLabelsAction = parser.popSymbol.asInstanceOf[Symbol.EnumLabelsAction]
        if (e < 0 || e >= top.size) throw new AvroTypeException("Enumeration out of range: max is " + top.size + " but received " + e)
        out.writeString(top.getLabel(e))
      }

      @throws[IOException]
      override def writeArrayStart(): Unit = {
        parser.advance(Symbol.ARRAY_START)
        out.writeStartArray()
        push()
        isEmpty.set(depth)
      }

      @throws[IOException]
      override def writeArrayEnd(): Unit = {
        if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
        pop()
        parser.advance(Symbol.ARRAY_END)
        out.writeEndArray()
      }

      @throws[IOException]
      override def writeMapStart(): Unit = {
        push()
        isEmpty.set(depth)
        parser.advance(Symbol.MAP_START)
        out.writeStartObject()
      }

      @throws[IOException]
      override def writeMapEnd(): Unit = {
        if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
        pop()
        parser.advance(Symbol.MAP_END)
        out.writeEndObject()
      }

      @throws[IOException]
      override def startItem(): Unit = {
        if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
        super.startItem()
        isEmpty.clear(depth)
      }



      @throws[IOException]
      override def doAction(input: Symbol, top: Symbol): Symbol = {
        if (top.isInstanceOf[Symbol.FieldAdjustAction]) {
          val fa: Symbol.FieldAdjustAction = top.asInstanceOf[Symbol.FieldAdjustAction]
          out.writeFieldName(fa.fname)
        }
        else if (top eq Symbol.RECORD_START) out.writeStartObject()
        else if ((top eq Symbol.RECORD_END) || (top eq Symbol.UNION_END)) out.writeEndObject()
        else if (top ne Symbol.FIELD_END) throw new AvroTypeException("Unknown action symbol " + top)
        null
      }
    }
0
On

As well as Stéphane mentioned, so far the only way is to create your own JsonEncoder. The funny thing is that the only method that you actually need to change is the writeIndex who is appending the union record when it's different of NULL. This is my code:

public class CustomJsonEncoder extends ParsingEncoder implements Parser.ActionHandler {
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
    final Parser parser;
    private JsonGenerator out;
    protected BitSet isEmpty = new BitSet();

    public CustomJsonEncoder(Schema sc, OutputStream out) throws IOException {
        this(sc, getJsonGenerator(out, false));
    }

    public CustomJsonEncoder(Schema sc, OutputStream out, boolean pretty) throws IOException {
        this(sc, getJsonGenerator(out, pretty));
    }

    public CustomJsonEncoder(Schema sc, JsonGenerator out) throws IOException {
        configure(out);
        this.parser = new Parser(new JsonGrammarGenerator().generate(sc), this);
    }

    @Override
    public void flush() throws IOException {
        parser.processImplicitActions();
        if (out != null) {
            out.flush();
        }
    }

    private static JsonGenerator getJsonGenerator(OutputStream out, boolean pretty) throws IOException {
        if (null == out)
            throw new NullPointerException("OutputStream cannot be null");
        JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
        if (pretty) {
            DefaultPrettyPrinter pp = new DefaultPrettyPrinter() {
                @Override
                public void writeRootValueSeparator(JsonGenerator jg) throws IOException {
                    jg.writeRaw(LINE_SEPARATOR);
                }
            };
            g.setPrettyPrinter(pp);
        } else {
            MinimalPrettyPrinter pp = new MinimalPrettyPrinter();
            pp.setRootValueSeparator(LINE_SEPARATOR);
            g.setPrettyPrinter(pp);
        }
        return g;
    }

    public CustomJsonEncoder configure(OutputStream out) throws IOException {
        this.configure(getJsonGenerator(out, false));
        return this;
    }

    private CustomJsonEncoder configure(JsonGenerator generator) throws IOException {
        if (null == generator)
            throw new NullPointerException("JsonGenerator cannot be null");
        if (null != parser) {
            flush();
        }
        this.out = generator;
        return this;
    }

    @Override
    public void writeNull() throws IOException {
        parser.advance(Symbol.NULL);
        out.writeNull();
    }

    @Override
    public void writeBoolean(boolean b) throws IOException {
        parser.advance(Symbol.BOOLEAN);
        out.writeBoolean(b);
    }

    @Override
    public void writeInt(int n) throws IOException {
        parser.advance(Symbol.INT);
        out.writeNumber(n);
    }

    @Override
    public void writeLong(long n) throws IOException {
        parser.advance(Symbol.LONG);
        out.writeNumber(n);
    }

    @Override
    public void writeFloat(float f) throws IOException {
        parser.advance(Symbol.FLOAT);
        out.writeNumber(f);
    }

    @Override
    public void writeDouble(double d) throws IOException {
        parser.advance(Symbol.DOUBLE);
        out.writeNumber(d);
    }

    @Override
    public void writeString(Utf8 utf8) throws IOException {
        writeString(utf8.toString());
    }

    @Override
    public void writeString(String str) throws IOException {
        parser.advance(Symbol.STRING);
        if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
            parser.advance(Symbol.MAP_KEY_MARKER);
            out.writeFieldName(str);
        } else {
            out.writeString(str);
        }
    }

    @Override
    public void writeBytes(ByteBuffer bytes) throws IOException {
        if (bytes.hasArray()) {
            writeBytes(bytes.array(), bytes.position(), bytes.remaining());
        } else {
            byte[] b = new byte[bytes.remaining()];
            bytes.duplicate().get(b);
            writeBytes(b);
        }
    }

    @Override
    public void writeBytes(byte[] bytes, int start, int len) throws IOException {
        parser.advance(Symbol.BYTES);
        writeByteArray(bytes, start, len);
    }

    private void writeByteArray(byte[] bytes, int start, int len) throws IOException {
        out.writeString(new String(bytes, start, len, StandardCharsets.ISO_8859_1));
    }

    @Override
    public void writeFixed(byte[] bytes, int start, int len) throws IOException {
        parser.advance(Symbol.FIXED);
        Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
        if (len != top.size) {
            throw new AvroTypeException(
                    "Incorrect length for fixed binary: expected " + top.size + " but received " + len + " bytes.");
        }
        writeByteArray(bytes, start, len);
    }

    @Override
    public void writeEnum(int e) throws IOException {
        parser.advance(Symbol.ENUM);
        Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol();
        if (e < 0 || e >= top.size) {
            throw new AvroTypeException("Enumeration out of range: max is " + top.size + " but received " + e);
        }
        out.writeString(top.getLabel(e));
    }

    @Override
    public void writeArrayStart() throws IOException {
        parser.advance(Symbol.ARRAY_START);
        out.writeStartArray();
        push();
        isEmpty.set(depth());
    }

    @Override
    public void writeArrayEnd() throws IOException {
        if (!isEmpty.get(pos)) {
            parser.advance(Symbol.ITEM_END);
        }
        pop();
        parser.advance(Symbol.ARRAY_END);
        out.writeEndArray();
    }

    @Override
    public void writeMapStart() throws IOException {
        push();
        isEmpty.set(depth());

        parser.advance(Symbol.MAP_START);
        out.writeStartObject();
    }

    @Override
    public void writeMapEnd() throws IOException {
        if (!isEmpty.get(pos)) {
            parser.advance(Symbol.ITEM_END);
        }
        pop();

        parser.advance(Symbol.MAP_END);
        out.writeEndObject();
    }

    @Override
    public void startItem() throws IOException {
        if (!isEmpty.get(pos)) {
            parser.advance(Symbol.ITEM_END);
        }
        super.startItem();
        isEmpty.clear(depth());
    }

    @Override
    public void writeIndex(int unionIndex) throws IOException {
        parser.advance(Symbol.UNION);
        Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol();
        Symbol symbol = top.getSymbol(unionIndex);
        parser.pushSymbol(symbol);
    }

    @Override
    public Symbol doAction(Symbol input, Symbol top) throws IOException {
        if (top instanceof Symbol.FieldAdjustAction) {
            Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
            out.writeFieldName(fa.fname);
        } else if (top == Symbol.RECORD_START) {
            out.writeStartObject();
        } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) {
            out.writeEndObject();
        } else if (top != Symbol.FIELD_END) {
            throw new AvroTypeException("Unknown action symbol " + top);
        }
        return null;
    }
}

Update

I created a PR that adds this feature in the JsonEncoder class. But let's see if they accept it.

https://github.com/apache/avro/pull/508