Confluent Kafka Producer - .NET Producer auto schema registry & Flink SQL

45 Views Asked by At

I am developing Kafka Proudcer application with c#(.NET) and working with both JsonSerializer and AvroSerializer(at this point, haven't determine which one should I implement it).

Both serializer allows me to create shcmea registry automatically on producer side which is good. But there is one thing that doesn't work for JsonSerializer.

Let's say I have PurcahseOrder model and it looks like this:

public class PurchaseOrder
{
    public string Name { get; }
    public double Price { get; }
    public int? Quality { get; }
}

For AvroSerializer since it doesn't support custom class so I could implement it with ISpecificRecord interface like this:

ISpecificRecord
    {
        public Schema Schema => _SCHEMA;

        public static readonly Schema _SCHEMA = Schema.Parse(json)
    }

This is fine, but for JsonSerializer, I realized it supports custom class so as long as I use Newtonsoft.Json attributes like this:

public class PurchaseOrder
{
    [JsonRequired]
    [JsonProperty("name")]
    public string Name { get; }

    [JsonRequired]
    [JsonProperty("price")]
    public double Price { get; }

    [JsonProperty("quantity")]
    public int? Quality { get; }
}

that will register schema registry for a corresponding topic on producer side. Since I am also using Flink SQL on Confluent Web UI(Confluent Console) int?(nullable) can't consume it when I do select * from topic_name; but then the weird thing is string? Quality works. For the PurchaseOrder json shcema looks like this if the Quality is integer:

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "title": "PurchaseOrderJson",
  "type": "object",
  "additionalProperties": false,
  "required": [ "name", "price"],
  "properties": {
    "name": { "type": "string" },
    "price": {
      "type": "number",
      "format": "double"
    },
    "quality": {
      "type": [ "integer", "null" ],
      "format": "int32"
    }
  }
}

It seems like Flink SQL cannot deserialize the nullable integer field but then string they can, I tried different ways for the Json attribute on public fields, e.g) DefaultValueHandling = DefaultValueHandling.Include. But none of them works as expected for int datatype.

By the way, when I do select * from I just get Internal Sever Error even though there is no single message exists on the topic with the above shcema registry which is really weird. And this is string version for Quality field and it works as expected.

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "title": "PurchaseOrderJson",
  "type": "object",
  "additionalProperties": false,
  "required": [ "name", "price"],
  "properties": {
    "name": { "type": "string" },
    "price": {
      "type": "number",
      "format": "double"
    },
    "quality": {
      "type": [ "string", "null" ],
      "format": "int32"
    }
  }
}
0

There are 0 best solutions below