FAUST Asynchronous kafka message processing concurrency is not working

1.2k Views Asked by At

Currently, I'm trying to read the data from kafka topic and call the rest-API asynchronously with data that I fetched from kafka topic. here rest-api gives response instantly if msg is Meher else response will takes 5 sec

kafka-data

Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher

below is the code:

app = faust.App(
    'faustApp',
    broker="kafka://localhost:9092",
    value_serializer='raw',
)

app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
    async for article in articles:
        val = article.decode('utf-8')
        url = 'http://0.0.0.0:5050/' + val
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
            print(data)
if __name__ == '__main__':
    app.main()

current-output:

Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!

expected-output:

Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!

expected is to get the responses for all the rest calls with an instant response first and late response should come after that, but currently it is working as sequentially.

If I increase the concurrency to 5, It is giving expected output, but should work with the same in case of concurrency 1. Not sure, if i'm missing something....any help on this ?

update1:

I have tried the samething with normal python asyncIO..It is working as expected

import asyncio
import aiofiles
import aiohttp

async def get_player(player_name):
    url = 'http://0.0.0.0:5050/' + player_name
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            data = await resp.text()
    print(data)


loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
    asyncio.gather(
        *(get_player(args) for args in player_args)
    )
)
1

There are 1 best solutions below

0
On

From faust documentation https://faust.readthedocs.io/en/latest/userguide/agents.html#id5 it seems that each agent instance handles one element of the stream at a time. Stream iteration is not parallelized over its available elements, but a single agent instance will handle stream elements one by one in order.

If you await something while processing an element of the stream, the agent instance will not move to the next element, if available, until the processing of that element is done. Awaiting on an operation will not "unlock" the agent, move it to the next stream element, and then recover the processing of the first element once the first awaiting is done.

On the other hand, if you set concurrency=5 you have 5 instances that can fetch item from the stream and process them at the same time, in parallel.

Asyncio.gather works since coroutines are wrapped into tasks and run concurrently all together, waiting for their results.