I have a table in PostgreSQL with the following schema:
Table "public.kc_ds"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
id | integer | | not null | nextval('kc_ds_id_seq'::regclass) | plain | |
num | integer | | not null | | plain | |
text | character varying(50) | | not null | | extended | |
Indexes:
"kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
"dbz_publication"
When I run a Debezium source connector for this table that uses io.confluent.connect.avro.AvroConverter
and Schema Registry, it creates a Schema Registry schema that looks this (some fields are omitted here):
"fields":[
{
"name":"before",
"type":[
"null",
{
"type":"record",
"name":"Value",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"num",
"type":"int"
},
{
"name":"text",
"type":"string"
}
],
"connect.name":"xxx.public.kc_ds.Value"
}
],
"default":null
},
{
"name":"after",
"type":[
"null",
"Value"
],
"default":null
},
]
The messages in my Kafka topic that are produced by Debezium look like this (some fields are omitted):
{
"before": null,
"after": {
"xxx.public.kc_ds.Value": {
"id": 2,
"num": 2,
"text": "text version 1"
}
}
When I INSERT or UPDATE, "before"
is always null
, and "after"
contains my data; when I DELETE, the inverse holds true: "after"
is null and "before"
contains the data (although all fields are set to default values).
Question #1: Why does Kafka Connect create a schema with "before"
and "after"
fields? Why do those fields behave in such a weird way?
Question #2: Is there a built-in way to make Kafka Connect send flat messages to my topics while still using Schema Registry? Please note that the Flatten transform is not what I need: if enabled, I will still have the "before"
and "after"
fields.
Question #3 (not actually hoping for anything, but maybe someone knows): The necessity to flatten my messages comes from the fact that I need to read the data from my topics using HudiDeltaStreamer, and it seems like this tool expects flat input data. The "before"
and "after"
fields end up being separate object-like columns in the resulting .parquet files. Does anyone have any idea how HudiDeltaStreamer is supposed to integrate with messages produced by Kafka Connect?