How to make one reroute for all topics in debezium Kafka Connect PostgresConnector

59 Views Asked by At

I have several tables in PostgreSQL with names:

 mdm.table1_a 
 mdm.table2_a_b 
 mdm.table3_a
 mdm.table4_a_b_c
 mdm.table5.a_b
 mdm.table6_a_b_c

Debezium connector for PostgreSQL creates topics with the same names as the table names with topic.prefix (e.g. "ips"):

 ips.mdm.table1_a 
 ips.mdm.table2_a_b 
 ips.mdm.table3_a
 ips.mdm.table4_a_b_c
 ips.mdm.table5.a_b
 ips.mdm.table6_a_b_c

But I need to replace the underscores with hyphens. To do this, I use SMT Tansforms Reroute.

    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)",       
    "transforms.Reroute.topic.replacement": "$1$2-$3-$4",       
    "transforms.Reroute2.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute2.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)",
    "transforms.Reroute2.topic.replacement": "$1$2-$3",
    "transforms.Reroute3.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute3.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)",
    "transforms.Reroute3.topic.replacement": "$1$2-$3-$4-$5"

As you can see, I use the transforms.Reroute 3 times. Is it possible to join them all into one universal Reroute?

Full connector configuration:

  {
    "name": "debezium_mdm",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "pg-wilddev.site1.com",
        "database.port": "5432",
        "database.dbname": "mdm",
        "database.user": "debezium",
        "database.password": "",
        "schema.include.list": "mdm",   
        "table.include.list": "mdm.table1_a,mdm.table2_a_b,mdm.table3_a,mdm.table4_a_b_c,mdm.table5.a_b,mdm.table6_a_b_c",
        "topic.prefix": "ips",
        "slot.name": "dbz_mdm",
        "publication.name": "dbz_mdm",
        "plugin.name": "pgoutput",
        "time.precision.mode": "connect",
        "heartbeat.interval.ms": "5000",
        "retriable.restart.connector.wait.ms": "100000",
        "topic.creation.default.partitions": "3",
        "topic.creation.default.replication.factor": "3",
        "topic.creation.groups": "mdm", 
        "topic.creation.mdm.partitions": "2",       
        "topic.creation.mdm.replication.factor": "3",
        "topic.creation.mdm.compression.type": "lz4",
        "topic.creation.mdm.include": ".*mdm.*",
        "topic.creation.mdm.cleanup.policy": "compact",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "transforms": "unwrap, extractToKey, Reroute, Reroute2, Reroute3",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.delete.tombstone.handling.mode": "drop",
        "transforms.extractToKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractToKey.field": "id"       
        "predicates": "topicNameMatch",
        "predicates.topicNameMatch.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
        "predicates.topicNameMatch.pattern": "ips.mdm.*",
        "transforms.extractToKey.predicate": "topicNameMatch",
        "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Reroute.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)",       
        "transforms.Reroute.topic.replacement": "$1$2-$3-$4",       
        "transforms.Reroute2.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Reroute2.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)",
        "transforms.Reroute2.topic.replacement": "$1$2-$3",
        "transforms.Reroute3.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Reroute3.topic.regex": "(.*\\.mdm\\.)([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)_([a-z0-9]*)",
        "transforms.Reroute3.topic.replacement": "$1$2-$3-$4-$5"
        }
  }
0

There are 0 best solutions below