I want to replicate data from a postgres database to another postgres database using connectors, but the table does not created in the second database, here is my config:
CREATE TABLE events(
identity_id STRING PRIMARY KEY,
name STRING,
family_name STRING,
event_type STRING
)
and this is connector and sink,
{"name": "postgres-source",
"config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"tasks.max":"1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "kafka_test",
"database.server.name": "dbserver1",
"database.whitelist": "kafka_test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.kafka_test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
}
}
{"name": "lead-sink",
"config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"10",
"topics": "events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "jdbc:postgresql://postgres:5432/kafka_test?user=postgres&password=postgres",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "identity_id",
"pk.mode": "record_key"
}
}
I try different approach from listening to topic by itself, and also using ksql I want to produce same table into the second postgres database.