I am running a dedicated MM2 cluster in each of my 4 DataCenters(DC). All of them are designed to work in Active/Active mode. This means that either of the 4 DCs should be able to replicate data to one another. Due to this setup, I am running into a scenario where data entering into a topic 'A' in a certain DC could have been produced in the same DC or could be a replicated copy from another DC. I need to be able to identify this and either allow this data to be replicated ahead to other DCs or stop it. My idea is to have the producer in the original DC add an origin header in each record, which I can then track to restrict data from being replicated more than once along the chain of replication flows.
This requires me to gain control over what records get written into the target cluster. I have tried the below approaches but none work.
- Extend the MirrorSourceConnector and supply it as connector.class in the connect-mirror-maker.properties. However, this approaches causes MM2 to replicate the messages 3X times since MM2 uses my custom connector class for configuring the MirrorHeartbeatConnector and the MirrorCheckpointConnector as well, effectively creating 3 source connectors somehow.
connector.class=com.visa.CustomMirrorSourceConnector
- Define a custom transform filter to read the record header and filter unwanted records. Below is just a sample transform usage to see if MM2 is able to honor it. I don't intend to really use this one.
transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
However, I get the below error.
org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Missing required configuration "transforms.InsertSource1.type" which has no default value.
Invalid value null for configuration transforms.InsertSource1.type: Not a Transformation
You can also find the above list of errors at the endpoint /connector-plugins/{connectorType}/config/validate
It seems to me that both the above approaches are only possible when running a standalone MirrorMaker connector.
Is there any way to have control over what records get replicated when running a dedicated MM2 cluster?
below is how my Active/Active replication is setup.
DC1 topic.dc1 (local) topic.dc2 (remote)
DC2 topic.dc1 (remote) topic.dc2 (local)
Producer in DC1 writes data (with origin header dc1) to topic.dc1 which then gets replicated to topic.dc1 (remote). Another Consumer-Producer set will read data from topic.dc1 (remote) and write it to topic.dc2 (local) for other local consumers like elastic search, etc Similarly, the data from topic.dc2 will be replicate to topic.dc2 (remote). This is how we are establishing Active-Active replication.
This all works fine but lets say we have DC3 as well.
DC3 topic.dc1 (remote) topic.dc2 (remote) topic.dc3 (local).
Data from topic.dc2 (local)
could make its way to DC3 and the same data could make its way from topic.dc1 (local)
as well. We want to stop one of these replications and we intend to do this by using the origin header so that we prevent cycles. So we indeed need the filter predicate but unfortunately, I don't see it available in this standalone MM2 mode