Faust stream processing is blocking

39 Views Asked by At

Im trying to use faust-streaming to check topic for failed tasks on a fixed time schedule for up to 5 times.

The code looks something like that:

@app.timer(interval=5)
async def process():
    stream = topic.stream()
    async for message in stream:
        do_something(message)
    do_some_post_processing()

It seems like async iterating over the stream never stops, even if topic is empty and all messages were already processed. The do_some_post_processing is never called, and the timer doesnt work on schedule, its just async for running forever. Is this expected behaviour in faust, or is there some way to detect no more messages and break from the loop?

0

There are 0 best solutions below