Kafka: Replicate topic A to topic B while applying a transformation to the records

1.9k Views Asked by At

I need to mirror records from a topic on a cluster A to a topic on cluster B while adding a field onto the record as they are proxied (eg. InsertField).

I am not controlling cluster A (but could require changes) and have full control of cluster B.

I know that cluster A is sending serialised JSON.

I am using the MirrorMaker API with Kafka connect to do the mirroring and I am trying to use InsertField transformation to add data on the record as they are proxied.

My configuration looks like that:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092

# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter


transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname

This code will fail with an error stating:

org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field insertion]

Is there a way to achieve this without writing a custom transform (I am using Python and I am not familiar with Java)

Thanks a lot

2

There are 2 best solutions below

0
On BEST ANSWER

In the current version of Apache Kafka (2.6.0), you cannot apply InsertField single message transformation (SMT) to MirrorMaker 2.0 records.

Explanation

The MirrorMaker 2.0 is based on Kafka Connect framework and, internally, the MirrorMaker 2.0 driver sets up MirrorSourceConnector.

Source connectors apply SMT immediately after polling records (there are no converters (e.g. ByteArrayConverter or JsonConverter) at this steps: they are used after SMT has been applied).

The SourceRecord value are represented as a byte array with BYTES_SCHEMA schema. At the same time InsertField transformation requires Type.STRUCT for records with schema.

So, since record can not be determine as Struct, transformation is not applied.

References

  1. KIP-382: MirrorMaker 2.0
  2. How to Use Single Message Transforms in Kafka Connect

Additional resources

  1. Docker-compose playground for MirrorMaker 2.0
6
On

As commented, the Byte Array converter has no Struct/Schema information, so therefore the transform you're using (adding a field) cannot be used.

This does not mean that no transforms can be used, however


If you're sending JSON messages, you must send schema and payload information.