I have a use case where I want to implement synchronous request / response on top of kafka. For example when the user sends an HTTP request, I want to produce a message on a specific kafka input topic that triggers a dataflow eventually resulting in a response produced on an output topic. I want then to consume the message from the output topic and return the response to the caller.

The workflow is: HTTP Request -> produce message on input topic -> (consume message from input topic -> app logic -> produce message on output topic) -> consume message from output topic -> HTTP Response.

To implement this case, upon receiving the first HTTP request I want to be able to create on the fly a consumer that will consume from the output topic, before producing a message on the input topic. Otherwise there is a possibility that messages on the output topic are "lost". Consumers in my case have a random group.id and have auto.offset.reset = latest for application reasons.

My question is how I can make sure that the consumer is ready before producing messages. I make sure that I call SubscribeTopics before producing messages. but in my tests so far when there are no committed offsets and kafka is resetting offsets to latest, there is a possibility that messages are lost and never read by my consumer because kafka sometimes thinks that the consumer registered after the messages have been produced.

My workaround so far is to sleep for a bit after I create the consumer to allow kafka to proceed with the commit reset workflow before I produce messages.

I have also tried to implement logic in a rebalance call back (triggered by a consumer subscribing to a topic), in which I am calling assign with offset = latest for the topic partition, but this doesn't seem to have fixed my issue.

Hopefully there is a better solution out there than sleep.

1

There are 1 best solutions below

0
On

Most HTTP client libraries have an implicit timeout. There's no guarantee your consumer will ever consume an event or that a downstream producer will send data to the "response topic".

Instead, have your initial request immediately return 201 Accepted status (or 400, for example, if you do request validation) with some tracking ID. Then require polling GET requests by-id for status updates either with 404 status or 200 + some status field within the request body.

You'll need a database to store intermediate state.