I need to mirror records from a topic on a cluster A to a topic on cluster B while adding a field onto the record as they are proxied (eg. InsertField).
I am not controlling cluster A (but could require changes) and have full control of cluster B.
I know that cluster A is sending serialised JSON.
I am using the MirrorMaker API with Kafka connect to do the mirroring and I am trying to use InsertField transformation to add data on the record as they are proxied.
My configuration looks like that:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092
# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname
This code will fail with an error stating:
org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field insertion]
Is there a way to achieve this without writing a custom transform (I am using Python and I am not familiar with Java)
Thanks a lot
In the current version of
Apache Kafka(2.6.0), you cannot apply InsertField single message transformation (SMT) toMirrorMaker 2.0records.Explanation
The
MirrorMaker 2.0is based onKafka Connectframework and, internally, the MirrorMaker 2.0 driver sets upMirrorSourceConnector.Source connectors apply SMT immediately after polling records (there are no converters (e.g.
ByteArrayConverterorJsonConverter) at this steps: they are used after SMT has been applied).The
SourceRecordvalue are represented as a byte array withBYTES_SCHEMAschema. At the same timeInsertFieldtransformation requiresType.STRUCTfor records with schema.So, since record can not be determine as
Struct, transformation is not applied.References
Additional resources