Can Kafka Sink Connectors include the record timestamp as the payload stored at the storage

3.3k Views Asked by At

I'm using both the S3 and JDBC sink connectors and I'm experiencing a bit of weird behavior about my data being stored. For some reconciliation I would really like to keep the either the Kafka Ingestion time or the record producing time into the data that is stored in the Sink system.

I was looking in the documentation and I did not find this. I'm using the Confluent connectors but I could also use other connectors like Camel if it would allow me to do this.

Can someone give me some pointers on this?

UPDATE: Based on the good feedback from onecricketeer I understood I should be looking at this: https://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield

And I also saw this example: Kafka connect consumer referencing offset and storing in message

I will test it but do I understand correctly for example that in theory I could do something like this:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"

And this would create me 3 new properties in the record called recordOffset, recordPartition and recordTimestamp containing the values described.

And If I wanted to ensure that the values would always be there or fail I would need to do (not sure I understood the suffix part):

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"
1

There are 1 best solutions below

0
On BEST ANSWER

As @OneCricketeer says, the InsertField Single Message Transform does the job here. Here's a sample S3 sink configuration using it:

{
          "connector.class"        : "io.confluent.connect.s3.S3SinkConnector",
          "storage.class"          : "io.confluent.connect.s3.storage.S3Storage",
          "s3.region"              : "us-west-2",
          "s3.bucket.name"         : "rmoff-smt-demo-01",
          "topics"                 : "customers,transactions",
          "tasks.max"              : "4",
          "flush.size"             : "16",
          "format.class"           : "io.confluent.connect.s3.format.json.JsonFormat",
          "schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
          "schema.compatibility"   : "NONE",
          "partitioner.class"      : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
          "transforms"                          : "insertTS,formatTS",
          "transforms.insertTS.type"            : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTS.timestamp.field" : "messageTS",
          "transforms.formatTS.type"            : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.formatTS.format"          : "yyyy-MM-dd HH:mm:ss:SSS",
          "transforms.formatTS.field"           : "messageTS",
          "transforms.formatTS.target.type"     : "string"        
        }

Note that it also uses TimestampConverter to format the timestamp in a string - by default it's an epoch.

Your question prompted me to write this up properly and record a little tutorial - you can see it here: https://youtu.be/3Gj_SoyuTYk