I have a table in PostgreSQL with the following schema:

                                                       Table "public.kc_ds"
 Column |         Type          | Collation | Nullable |              Default              | Storage  | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
 id     | integer               |           | not null | nextval('kc_ds_id_seq'::regclass) | plain    |              |
 num    | integer               |           | not null |                                   | plain    |              |
 text   | character varying(50) |           | not null |                                   | extended |              |
Indexes:
    "kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
    "dbz_publication"

When I run a Debezium source connector for this table that uses io.confluent.connect.avro.AvroConverter and Schema Registry, it creates a Schema Registry schema that looks this (some fields are omitted here):

"fields":[
      {
         "name":"before",
         "type":[
            "null",
            {
               "type":"record",
               "name":"Value",
               "fields":[
                  {
                     "name":"id",
                     "type":"int"
                  },
                  {
                     "name":"num",
                     "type":"int"
                  },
                  {
                     "name":"text",
                     "type":"string"
                  }
               ],
               "connect.name":"xxx.public.kc_ds.Value"
            }
         ],
         "default":null
      },
      {
         "name":"after",
         "type":[
            "null",
            "Value"
         ],
         "default":null
      },
]

The messages in my Kafka topic that are produced by Debezium look like this (some fields are omitted):

{
  "before": null,
  "after": {
    "xxx.public.kc_ds.Value": {
      "id": 2,
      "num": 2,
      "text": "text version 1"
    }
}

When I INSERT or UPDATE, "before" is always null, and "after" contains my data; when I DELETE, the inverse holds true: "after" is null and "before" contains the data (although all fields are set to default values).

Question #1: Why does Kafka Connect create a schema with "before" and "after" fields? Why do those fields behave in such a weird way?

Question #2: Is there a built-in way to make Kafka Connect send flat messages to my topics while still using Schema Registry? Please note that the Flatten transform is not what I need: if enabled, I will still have the "before" and "after" fields.

Question #3 (not actually hoping for anything, but maybe someone knows): The necessity to flatten my messages comes from the fact that I need to read the data from my topics using HudiDeltaStreamer, and it seems like this tool expects flat input data. The "before" and "after" fields end up being separate object-like columns in the resulting .parquet files. Does anyone have any idea how HudiDeltaStreamer is supposed to integrate with messages produced by Kafka Connect?

0

There are 0 best solutions below