I have an issue with Postgres
connectors.
This is my logs:
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:41:04,184 INFO Postgres|dbserver1|streaming Initializing PgOutput logical decoder publication [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
postgres | 2024-01-06 00:41:04.188 UTC [51] LOG: starting logical decoding for slot "1244578"
postgres | 2024-01-06 00:41:04.188 UTC [51] DETAIL: Streaming transactions committing after 0/15857D0, reading WAL from 0/1585798.
postgres | 2024-01-06 00:41:04.188 UTC [51] STATEMENT: START_REPLICATION SLOT "1244578" LOGICAL 0/1586678 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')
postgres | 2024-01-06 00:41:04.188 UTC [51] LOG: logical decoding found consistent point at 0/1585798
postgres | 2024-01-06 00:41:04.188 UTC [51] DETAIL: There are no running transactions.
postgres | 2024-01-06 00:41:04.188 UTC [51] STATEMENT: START_REPLICATION SLOT "1244578" LOGICAL 0/1586678 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:41:04,199 INFO Postgres|dbserver1|streaming Requested thread factory for connector PostgresConnector, id = dbserver1 named = keep-alive [io.debezium.util.Threads]
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:41:04,199 INFO Postgres|dbserver1|streaming Creating thread debezium-postgresconnector-dbserver1-keep-alive [io.debezium.util.Threads]
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:41:04,201 INFO Postgres|dbserver1|streaming Processing messages [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:41:04,234 INFO Postgres|dbserver1|streaming Message with LSN 'LSN{0/15866D8}' arrived, switching off the filtering [io.debezium.connector.postgresql.connection.WalPositionLocator]
postgres | 2024-01-06 00:43:19.634 UTC [27] LOG: checkpoint starting: time
postgrestwo | 2024-01-06 00:43:19.612 UTC [27] LOG: checkpoint starting: time
postgrestwo | 2024-01-06 00:43:19.697 UTC [27] LOG: checkpoint complete: wrote 3 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.028 s, sync=0.008 s, total=0.085 s; sync files=2, longest=0.007 s, average=0.004 s; distance=0 kB, estimate=0 kB; lsn=0/195AED8, redo lsn=0/195AEA0
postgres | 2024-01-06 00:43:19.883 UTC [27] LOG: checkpoint complete: wrote 5 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.218 s, sync=0.006 s, total=0.250 s; sync files=4, longest=0.004 s, average=0.002 s; distance=1 kB, estimate=1 kB; lsn=0/1586C58, redo lsn=0/1586C20
kafka | [2024-01-06 00:43:36,726] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka | [2024-01-06 00:43:36,727] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka | [2024-01-06 00:43:36,729] DEBUG [Controller id=1001] Topics not in preferred replica for broker 1001 Map() (kafka.controller.KafkaController)
kafka | [2024-01-06 00:43:36,729] TRACE [Controller id=1001] Leader imbalance ratio for broker 1001 is 0.0 (kafka.controller.KafkaController)
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:43:38,184 INFO || [AdminClient clientId=adminclient-8] Node -1 disconnected. [org.apache.kafka.clients.NetworkClient]
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:43:44,192 INFO || [AdminClient clientId=connector-adminclient-postgres-connector-0] Node -1 disconnected. [org.apache.kafka.clients.NetworkClient]
postgres | 2024-01-06 00:43:19.634 UTC [27] LOG: checkpoint starting: time
postgrestwo | 2024-01-06 00:43:19.612 UTC [27] LOG: checkpoint starting: time
postgrestwo | 2024-01-06 00:43:19.697 UTC [27] LOG: checkpoint complete: wrote 3 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.028 s, sync=0.008 s, total=0.085 s; sync files=2, longest=0.007 s, average=0.004 s; distance=0 kB, estimate=0 kB; lsn=0/195AED8, redo lsn=0/195AEA0
postgres | 2024-01-06 00:43:19.883 UTC [27] LOG: checkpoint complete: wrote 5 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.218 s, sync=0.006 s, total=0.250 s; sync files=4, longest=0.004 s, average=0.002 s; distance=1 kB, estimate=1 kB; lsn=0/1586C58, redo lsn=0/1586C20
kafka | [2024-01-06 00:43:36,726] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka | [2024-01-06 00:43:36,727] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka | [2024-01-06 00:43:36,729] DEBUG [Controller id=1001] Topics not in preferred replica for broker 1001 Map() (kafka.controller.KafkaController)
kafka | [2024-01-06 00:43:36,729] TRACE [Controller id=1001] Leader imbalance ratio for broker 1001 is 0.0 (kafka.controller.KafkaController)
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:43:38,184 INFO || [AdminClient clientId=adminclient-8] Node -1 disconnected. [org.apache.kafka.clients.NetworkClient]
cdc-mysql-debezium-postgresql-connect-1 | 2024-01-06 00:43:44,192 INFO || [AdminClient clientId=connector-adminclient-postgres-connector-0] Node -1 disconnected. [org.apache.kafka.clients.NetworkClient]
This is my connector for Postgres
:
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.80.1",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.server.id": "184054",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"database.include.list": "postgres",
"database.history.kafka.bootstrap.servers": "192.168.80.1:9092",
"tasks.max": "1",
"slot.name":"1244578",
"plugin.name" : "pgoutput",
"table.include.list":"postgres.mytable",
"database.history.kafka.topic": "postgres.mytable",
"include.schema.changes": "true",
"snapshot.mode": "always",
"snapshot.locking.mode": "none",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields":"op,source.ts_ms",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable": "false"
}
}
and this my jdbc
sink:
{
"name": "jdbc-postgres-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mytable",
"connection.url": "jdbc:postgresql://192.168.80.1:5433/postgres?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
This is the table:
postgres=# \d mytable;
Table "public.mytable"
Column | Type | Collation | Nullable | Default
---------+-------------------+-----------+----------+---------
id | integer | | not null |
message | character varying | | |
Indexes:
"mytable_pkey" PRIMARY KEY, btree (id)
Publications:
"dbz_publication"
I tried this configs but no tables was shown in the second database I tried to consume data from the topic, I did not receive any data I want to create the same table with fields in the second database