Submitting offsets to kafka after storm batch

247 Views Asked by At

What would be the correct way to submit only the highest offset of every partion when batch bolt finishes proccessing a batch? My main concern is machines dying while proccessing batches as the whole shebang is going to run in AWS spot instances.

I am new to storm development I can't seem to find an answer to IMO is pretty straight forward usage of kafka and storm.

Scenario:

Based on the Guaranteeing Message Processing guide, lets assume that I have a steam (kafka topic) of ("word",count) tuples, Batch bolt that proccess X tupples, does some aggregation and creates CSV file, uploads the file to hdfs/db and acks.

In non-strom "naive" implementation, I would read X msgs (or read for Y seconds), aggregate,write to hdfs and once the upload is completed, commit the latest (highest) offset for every partition to kafka. If machine or proccess dies before the db commit - the next iteration will start from the previous place.

In storm I can create batch bolt that will anchor all of the batch tuples and ack them at once, however I can't find a way to commit the highest offset of every partition to kafka, as the spouts are unaware of the batch, so once the batch bolt acks the tupples, every spout instance will ack his tupples one by one, so I the way I see it I can:

  1. Submit the offset of the acked message on every ack on the spout. This will cause many submits (every batch can be few Ks of tupples), probably out of order, and if the spout works dies while submitting the offsets, I will end up partially replaing some of the events.
  2. Same as 1. but I can add some local offset management in the highest offset commited (fixing out of order offset commits) and submit the highets offset seen every few seconds (reducing the high number of submits) but I still can end up with partially commited offsets if the spout dies
  3. Move the offset submition logic to the bolts - I can add the partition and offset of every message into data sent to the batch bolt and submit the highest proccessed offset of every partition as part of the batch (emit to "offset submitter" bolt at the end of the batch). This will solve the offset tracking, multiple submits and patial replayes issues but this adds kafka specific logic to the bolts thus copling the bolt code with kafka and in generally speaking seems to me as reinventing the wheel.
  4. Go even further with wheel reinvention and manually managed the highest processed patition-offset combination in ZK and read this value when I init the spout.
1

There are 1 best solutions below

0
On

There is quite a lot in your question, so not sure if this addresses it fully, but if you are concerned about the number of confirmations sent to kafka (e.g. after every message) you should be able to set a batch size for consumption, for example 1000 to reduce this a lot.