How to build stateful services which perform exactly-once processing using Kafka transactions?

55 Views Asked by At

Please note: I added two answers below but at least in my opinion these answers are not "the whole story". Other answers are invited.


My knowledge level of Kafka is such that I understand how to build stateless microservices which operate using the consumer-producer transactions API. However I do not understand how to extend this to stateful services.

To keep this question manageable, let us assume the question is limited to classes of services which are singletons, meaning that only one copy of a service process can be run at any one time.

Kafka consumers can operate as part of a consumer group, where multiple processes, each with their own consumer, process events from 1 or more topics. Let's exclude this class of problems from the question to keep matters simple.

In order to make the question more easily comprehendible, it would make sense to define a toy example problem. So let's do that -

Toy Example Problem

Events need to be consumed from a topic "input" and sent to a topic "output".

The input topic contains two types of events. "A" and "B". Each event has an associated timestamp and value.

This is what an example event of type "A" might look like:

{
    "type": "A",
    "timestamp": "2024-02-09 21:00:00",
    "value": 1,
}

Our objective is to pair up events of type "A" and "B" and calculate the sum of the value fields.

Events of type "A" and "B" are randomized, meaning that there is no pre-determined ordering as to whether an event of type "A" will arrive before or after the corresponding* event of type "B". (*) Corresponding = event with the same timestamp.

Some events may be missing for a particular timestamp, meaning that for some timestamp values, only one, or possibly even no events, might be seen for that timestamp value.

Additionally, timestamps may be out of order by some small amount. (Maybe a few minutes maximum.)

This is a good example problem because events will be read individually, meaning that "A" and "B" arrive separately. This requires that the process maintain some state in memory. The fact that some events for some timestamps may be missing, and that the timestamps are not ordered but may be slightly randomized means that a potentially quite large amount of state must potentially be maintained in memory.

We can prevent this amount of state from becoming infinite by imposing an additional constraint. Events older than 5 minutes old will expire, meaning if an event of type "A" arrives, we wait for up to 5 minutes for "B" to arrive before discarding "A".

This leads to a complexity in answering the question about "how to write commit logic".

I don't actually know how to do it, but can offer some initial suggestions.

Suggestion 1:

  • A possible approach might be to maintain state in another storage such as on disk or in MongoDB
  • It is difficult to guarantee exactly once processing when interfacing to some other type of system. I am not sure it can be done in general
  • It might be better to try and leverage Kafka to help solve the problem.

Suggestion 2:

  • We could add another topic "state" to store the state of our service. The typical consume-produce loop would look like this:

  • Start transaction

  • Consume state from "input" topic

  • Update internal state in memory

  • Produce events to "output" topic

  • Produce event to "state" topic which is a copy of the remaining internal state in memory

  • Commit and end transaction

The problem with this approach is a large amount of data might potentially be sent to the "state" topic. It might become a very slow service if large volumes of mostly repeated data has to be sent over the network.

  • It could perhaps be made more efficient by consuming and processing (for example) 100 events at a time, although this would increase input to output latency.
  • The transaction would span 100 events and the state topic would only be contain 1% of the volume of data compared to processing events individually.

This architecture still has problems as the startup logic requires reading from the "state" topic until the final "state" is read. This is essentially a "seek" operation to the end of the topic. I am not sure that operation can be performed consistently. (It might be the case that the only way to do it is by checking the returned message offsets, and querying the topic for the greatest offset before starting to read messages from it.) This is not a particularly good design and does not fit nicely with how Kafka is intended to be used.

I am also not sure how commit logic should work when including the "state" topic. Perhaps a "commit" can be performed as soon as a new "state" is produced to this topic.

Alternatively, we could build a "following consumer" architecture, whereby the process has a producer and a consumer for the state topic. The purpose of the consumer is for no other reason than to "read-back" produced events and perform commit operations. This becomes a messy and complicated architecture, which it would be easy to break.

Summary

In summary, I am not sure how to, or if it is possible to, write a stateful service which performs exactly-once processing with Kafka and the transactions API, even in the restricted case where that service is a singleton. (Only a single copy of it running at once.)

I believe the second suggestion will work - but it is very "un-Kafkaesque". It does not fit nicely with the usual way services are constructed around Kafka. Typically services read from one or more topics, produce data and then commit a transaction.

With the inclusion of a "state" topic things become weird because:

  • We continually send out new data which make previous data on the topic obsolete. We only ever care about the last data on the topic, and yet we maintain a stream of data, because this is what a Kafka topic is.
  • It is not clear how to commit efficiently in a way which does not lead to slow startup times caused by fetching out of date state data from the state topic.
2

There are 2 best solutions below

1
FreelanceConsultant On

This is not really a solid answer but I didn't want to add yet more lines of text into the question.

I wonder whether I am thinking about Stateful Services the wrong way.

There doesn't seem to me to be an obvious way of using Kafka in an "elegant" way to persist the aggregated internal state of a process in general. Suggestion 2 from above is as close as I have managed to get so far.

  • In particular as the state topic grows, reading all the data from it could become a very slow running operation.

I have been trying to think about Stateful Services as things which accumulate data in memory and try to avoid putting it somewhere else like in a database, writing to disk or sending big blocks of aggregated data back to Kafka.

I am starting to think that perhaps the only sensible conclusion is that if processes maintain large amounts of state in memory then they also need to maintain that state in a resiliant data store in case the process stops running and the data in memory evaporates.

This shifted my thinking towards thinking about how to consume a single event from Kafka and update the persisted copy of the aggregated in memory data, rather than trying to commit to Kafka in some clever way to avoid requiring an on disk/in database copy of the process data.

So perhaps rather than worrying about transactions I should be instead thinking about how to

  • read data from Kafka
  • updating the in memory data structures for the processor
  • check if this data is already persisted to some data store or not (maybe by recording the topic names, partitions and offsets of previously processed events?)
  • persisting a copy of the in memory data structures for the processor
  • committing the Kafka read

This way, even if the process dies before comitting it has a way of performing de-duplication of the previously read data if required.

Then on the producer side things become more tricky. Perhaps the transaction has to extend to producing as well.

I hope this makes some sense, it's nearly midnight in my TZ.

0
FreelanceConsultant On

It seems obvious to say this is a generally hard problem.

It seems there are broadly speaking 3 approaches:

  • What goes out of Kafka must go into Kafka. Using this we can leverage Kafka's built in de-duplication logic and transactional logic. This requires storing all state for a stateful service in Kafka as well as the input and output streams.

  • Manual de-duplication is required in other cases. This introduces an additional developer overhead in that unique keys must be created for deduplication purposes. There is an additional problem to be solved. If a message is read from Kafka and the consumer which read it wants to commit, that event data must be placed in some other resiliant data store, because the act of committing will prevent the same data from being read again.

  • Store no state, and have the service re-construct its internal state and any missing output by reading first the entire output history and secondly the entire input history. This is potentially a very slow operation, and requires quite complex internal logic to perform reconciliation within the process. It also makes comitting impossible.

https://www.confluent.io/blog/chain-services-exactly-guarantees/