is transforms working in confluent.cloud(Kafka) to elastic cloud

207 Views Asked by At

I am creating the elastic sink connector in confluent.cloud(Kafka). here, if I use transforms, it is not working. will confluent.cloud(kafka) supports transforms?.

Below is my script,

{
      "name": "ElasticConnectorTest5",
      "config": {
      "topics": "enterprise.contact6.model",
      "input.data.format": "AVRO",
      "connector.class": "ElasticsearchSink",
      "name": "ElasticConnectorTest5",
      "kafka.api.key": "xxxxxxxxxxxxx",
      "kafka.api.secret": "xxxxxxxxxxxxxxxxxx",
      "connection.url": "url",
      "connection.username": "**********",
      "connection.password": "*********",
      "type.name": "_doc",
      "key.ignore": "false",
      "schema.ignore": "true",
      "tasks.max": "1",
      "errors.tolerance": "all",
      "errors.log.enable": "true",
      "errors.log.include.messages": "true",
      "behavior.on.malformed.documents": "warn",
      "transforms": "InsertMetadata,IndexName",
      "transforms.InsertMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.InsertMetadata.partition.field": "partition",
      "transforms.InsertMetadata.offset.field": "offset",
      "transforms.IndexName.type": "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.IndexName.regex": "enterprise.(.*)",
      "transforms.IndexName.replacement": "es.$1"
  }
}
1

There are 1 best solutions below

0
On

Single Message Transforms (SMTs) are now supported by Confluent Cloud (Oct 2021):

we’re excited to announce that most of our Confluent Cloud connectors now support single message transforms, connector log events, and connector data output previews.

Note, there is word "most" in the announcement and some transforms are not yet supported:

The RegexRouter SMT is not currently supported for managed connectors. A few sink connectors do not support the following transformations. This limitation is noted in the connector Quick Start for each affected connector.

    org.apache.kafka.connect.transforms.TimestampRouter
    io.confluent.connect.transforms.MessageTimestampRouter
    io.confluent.connect.transforms.ExtractTopic$Key
    io.confluent.connect.transforms.ExtractTopic$Value

As well as some specific limitations for Elasticsearch Service Sink Connector for Confluent Cloud.