I need to mirror records from a topic on a cluster A to a topic on cluster B while adding a field onto the record as they are proxied (eg. InsertField
).
I am not controlling cluster A (but could require changes) and have full control of cluster B.
I know that cluster A is sending serialised JSON.
I am using the MirrorMaker API with Kafka connect to do the mirroring and I am trying to use InsertField
transformation to add data on the record as they are proxied.
My configuration looks like that:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092
# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname
This code will fail with an error stating:
org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field insertion]
Is there a way to achieve this without writing a custom transform (I am using Python and I am not familiar with Java)
Thanks a lot
In the current version of
Apache Kafka
(2.6.0), you cannot apply InsertField single message transformation (SMT) toMirrorMaker 2.0
records.Explanation
The
MirrorMaker 2.0
is based onKafka Connect
framework and, internally, the MirrorMaker 2.0 driver sets upMirrorSourceConnector
.Source connectors apply SMT immediately after polling records (there are no converters (e.g.
ByteArrayConverter
orJsonConverter
) at this steps: they are used after SMT has been applied).The
SourceRecord
value are represented as a byte array withBYTES_SCHEMA
schema. At the same timeInsertField
transformation requiresType.STRUCT
for records with schema.So, since record can not be determine as
Struct
, transformation is not applied.References
Additional resources