postgres to postgres using debezium connector

98 Views Asked by At

I have an issue with Postgres connectors.

This is my logs:

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:41:04,184 INFO   Postgres|dbserver1|streaming  Initializing PgOutput logical decoder publication   [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]

postgres                                 | 2024-01-06 00:41:04.188 UTC [51] LOG:  starting logical decoding for slot "1244578"

postgres                                 | 2024-01-06 00:41:04.188 UTC [51] DETAIL:  Streaming transactions committing after 0/15857D0, reading WAL from 0/1585798.

postgres                                 | 2024-01-06 00:41:04.188 UTC [51] STATEMENT:  START_REPLICATION SLOT "1244578" LOGICAL 0/1586678 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')

postgres                                 | 2024-01-06 00:41:04.188 UTC [51] LOG:  logical decoding found consistent point at 0/1585798

postgres                                 | 2024-01-06 00:41:04.188 UTC [51] DETAIL:  There are no running transactions.

postgres                                 | 2024-01-06 00:41:04.188 UTC [51] STATEMENT:  START_REPLICATION SLOT "1244578" LOGICAL 0/1586678 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:41:04,199 INFO   Postgres|dbserver1|streaming  Requested thread factory for connector PostgresConnector, id = dbserver1 named = keep-alive   [io.debezium.util.Threads]

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:41:04,199 INFO   Postgres|dbserver1|streaming  Creating thread debezium-postgresconnector-dbserver1-keep-alive   [io.debezium.util.Threads]

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:41:04,201 INFO   Postgres|dbserver1|streaming  Processing messages   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:41:04,234 INFO   Postgres|dbserver1|streaming  Message with LSN 'LSN{0/15866D8}' arrived, switching off the filtering   [io.debezium.connector.postgresql.connection.WalPositionLocator]
postgres                                 | 2024-01-06 00:43:19.634 UTC [27] LOG:  checkpoint starting: time

postgrestwo                              | 2024-01-06 00:43:19.612 UTC [27] LOG:  checkpoint starting: time

postgrestwo                              | 2024-01-06 00:43:19.697 UTC [27] LOG:  checkpoint complete: wrote 3 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.028 s, sync=0.008 s, total=0.085 s; sync files=2, longest=0.007 s, average=0.004 s; distance=0 kB, estimate=0 kB; lsn=0/195AED8, redo lsn=0/195AEA0

postgres                                 | 2024-01-06 00:43:19.883 UTC [27] LOG:  checkpoint complete: wrote 5 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.218 s, sync=0.006 s, total=0.250 s; sync files=4, longest=0.004 s, average=0.002 s; distance=1 kB, estimate=1 kB; lsn=0/1586C58, redo lsn=0/1586C20

kafka                                    | [2024-01-06 00:43:36,726] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)

kafka                                    | [2024-01-06 00:43:36,727] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)

kafka                                    | [2024-01-06 00:43:36,729] DEBUG [Controller id=1001] Topics not in preferred replica for broker 1001 Map() (kafka.controller.KafkaController)

kafka                                    | [2024-01-06 00:43:36,729] TRACE [Controller id=1001] Leader imbalance ratio for broker 1001 is 0.0 (kafka.controller.KafkaController)

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:43:38,184 INFO   ||  [AdminClient clientId=adminclient-8] Node -1 disconnected.   [org.apache.kafka.clients.NetworkClient]

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:43:44,192 INFO   ||  [AdminClient clientId=connector-adminclient-postgres-connector-0] Node -1 disconnected.   [org.apache.kafka.clients.NetworkClient]

    postgres                                 | 2024-01-06 00:43:19.634 UTC [27] LOG:  checkpoint starting: time

postgrestwo                              | 2024-01-06 00:43:19.612 UTC [27] LOG:  checkpoint starting: time

postgrestwo                              | 2024-01-06 00:43:19.697 UTC [27] LOG:  checkpoint complete: wrote 3 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.028 s, sync=0.008 s, total=0.085 s; sync files=2, longest=0.007 s, average=0.004 s; distance=0 kB, estimate=0 kB; lsn=0/195AED8, redo lsn=0/195AEA0

postgres                                 | 2024-01-06 00:43:19.883 UTC [27] LOG:  checkpoint complete: wrote 5 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.218 s, sync=0.006 s, total=0.250 s; sync files=4, longest=0.004 s, average=0.002 s; distance=1 kB, estimate=1 kB; lsn=0/1586C58, redo lsn=0/1586C20

kafka                                    | [2024-01-06 00:43:36,726] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)

kafka                                    | [2024-01-06 00:43:36,727] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)

kafka                                    | [2024-01-06 00:43:36,729] DEBUG [Controller id=1001] Topics not in preferred replica for broker 1001 Map() (kafka.controller.KafkaController)

kafka                                    | [2024-01-06 00:43:36,729] TRACE [Controller id=1001] Leader imbalance ratio for broker 1001 is 0.0 (kafka.controller.KafkaController)

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:43:38,184 INFO   ||  [AdminClient clientId=adminclient-8] Node -1 disconnected.   [org.apache.kafka.clients.NetworkClient]

cdc-mysql-debezium-postgresql-connect-1  | 2024-01-06 00:43:44,192 INFO   ||  [AdminClient clientId=connector-adminclient-postgres-connector-0] Node -1 disconnected.   [org.apache.kafka.clients.NetworkClient]

This is my connector for Postgres:

{
    "name": "postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "192.168.80.1",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.server.id": "184054", 
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "database.include.list": "postgres", 
        "database.history.kafka.bootstrap.servers": "192.168.80.1:9092", 
        "tasks.max": "1",
        "slot.name":"1244578",
        "plugin.name" : "pgoutput",
        "table.include.list":"postgres.mytable",
        "database.history.kafka.topic": "postgres.mytable", 
        "include.schema.changes": "true",
        "snapshot.mode": "always",
        "snapshot.locking.mode": "none",
        "transforms":"unwrap",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.add.fields":"op,source.ts_ms",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable":"false",
        "value.converter.schemas.enable": "false"
    }
}

and this my jdbc sink:

{
    "name": "jdbc-postgres-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "mytable",
        "connection.url": "jdbc:postgresql://192.168.80.1:5433/postgres?user=postgresuser&password=postgrespw",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "id",
        "pk.mode": "record_key"
    }
}

This is the table:

postgres=# \d mytable;

                Table "public.mytable"

Column | Type | Collation | Nullable | Default

---------+-------------------+-----------+----------+---------

id | integer | | not null |

message | character varying | | |

Indexes:

"mytable_pkey" PRIMARY KEY, btree (id)

Publications:

"dbz_publication"

I tried this configs but no tables was shown in the second database I tried to consume data from the topic, I did not receive any data I want to create the same table with fields in the second database

0

There are 0 best solutions below