How to manually commit kafka offset after FileIO in apache beam?

198 Views Asked by At

I have a FileIO writing a Pcollection<GenericRecord> to files and returns WriteFilesResult<DestinationT>.

I would like to create a DoFn after writing files to commit the offset of written records to kafka but since my offsets are stored in my GenericRecords I can no longer access them in the output of FileIO.

What is the best way to solve this ?

1

There are 1 best solutions below

2
On BEST ANSWER

For anyone interested, here is how I did:

  • manually groupbykey records by DestinationT
  • for each group I get the list of offsets and I create a new key EnrichedDestinationT + flatten the iterable
  • so the Pcollection before entering FileIO is PCollection<KV<EnrichedDestinationT, GenericRecord>>
  • in FileIO, the .by() becomes .by(KV::getKey) and the .via() becomes .via(Contextful.fn(KV::getValue), Contextful.fn(this::getSink))