I have several tables in PostgreSQL with names:
mdm.table1_a
mdm.table2_a_b
mdm.table3_a
mdm.table4_a_b_c
mdm.table5.a_b
mdm.table6_a_b_c
Debezium connector for PostgreSQL creates topics with the same names as the table names with topic.prefix (e.g. "ips"):
ips.mdm.table1_a
ips.mdm.table2_a_b
ips.mdm.table3_a
ips.mdm.table4_a_b_c
ips.mdm.table5.a_b
ips.mdm.table6_a_b_c
But I need to replace the underscores with hyphens. To do this, I use SMT Tansforms Reroute.
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)",
"transforms.Reroute.topic.replacement": "$1$2-$3-$4",
"transforms.Reroute2.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute2.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)",
"transforms.Reroute2.topic.replacement": "$1$2-$3",
"transforms.Reroute3.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute3.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)",
"transforms.Reroute3.topic.replacement": "$1$2-$3-$4-$5"
As you can see, I use the transforms.Reroute
3 times. Is it possible to join them all into one universal Reroute?
Full connector configuration:
{
"name": "debezium_mdm",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg-wilddev.site1.com",
"database.port": "5432",
"database.dbname": "mdm",
"database.user": "debezium",
"database.password": "",
"schema.include.list": "mdm",
"table.include.list": "mdm.table1_a,mdm.table2_a_b,mdm.table3_a,mdm.table4_a_b_c,mdm.table5.a_b,mdm.table6_a_b_c",
"topic.prefix": "ips",
"slot.name": "dbz_mdm",
"publication.name": "dbz_mdm",
"plugin.name": "pgoutput",
"time.precision.mode": "connect",
"heartbeat.interval.ms": "5000",
"retriable.restart.connector.wait.ms": "100000",
"topic.creation.default.partitions": "3",
"topic.creation.default.replication.factor": "3",
"topic.creation.groups": "mdm",
"topic.creation.mdm.partitions": "2",
"topic.creation.mdm.replication.factor": "3",
"topic.creation.mdm.compression.type": "lz4",
"topic.creation.mdm.include": ".*mdm.*",
"topic.creation.mdm.cleanup.policy": "compact",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"transforms": "unwrap, extractToKey, Reroute, Reroute2, Reroute3",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.tombstone.handling.mode": "drop",
"transforms.extractToKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractToKey.field": "id"
"predicates": "topicNameMatch",
"predicates.topicNameMatch.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.topicNameMatch.pattern": "ips.mdm.*",
"transforms.extractToKey.predicate": "topicNameMatch",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)",
"transforms.Reroute.topic.replacement": "$1$2-$3-$4",
"transforms.Reroute2.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute2.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)",
"transforms.Reroute2.topic.replacement": "$1$2-$3",
"transforms.Reroute3.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute3.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)",
"transforms.Reroute3.topic.replacement": "$1$2-$3-$4-$5"
}
}