await Faust Agent ask() never receive from yield generator

608 Views Asked by At

Hi I am trying to integrate faust with fastapi endpoints following this example: toh995/fastapi-faust-example

I am working with a simple DummyOrder model

class DummyOrder(faust.Record,MyAvroModel,serializer='avro_order_codec'):
    order_id: str
    amount: int

I have an faust agent that yields balance

@app.agent(test_topic)
async def table_balance(orders: faust.Stream):
    async for order in orders.group_by(DummyOrder.order_id):
        print (f'order id: {order.order_id} has balance of {balance_order_table[order.order_id]}')
        yield balance_order_table[order.order_id]

For fastapi, I have

@fastapi_app.on_event("startup")
async def startup():
    #set up the faust app
    worker.faust_app_for_api('faust')
    faust_app = worker.get_faust_app()
    print('starting client')
    #start the faust app in client mode
    asyncio.create_task(
        faust_app.start_client()
    )
    print('Client Created')

@fastapi_app.get("/")
async def entrypoint():
    from order.infrastructure.faust_app.tasks import order_balance
    print("getting balance")
    balance = await order_balance.table_balance.ask(DummyOrder(order_id='AB001', amount=0))
    print(balance)
    return balance
if __name__ == '__main__':
    uvicorn.run("fast_api_app:fastapi_app", host="0.0.0.0", port=3000)

Then I ran both faust worker and fastapi with the following faust.App configuration

for main faust worker

        app = faust.App(
            id=faust_app_id,
            broker=[
                f'kafka://{self.bootstrap_server}'
            ],
            broker_credentials=faust.SASLCredentials(
                username=self.username,
                password=self.password,
                ssl_context=self.ssl_settings
            ),
            autodiscover=True,
            origin="order.infrastructure.faust_app", #mandetory if autodiscover is enabled
            value_serializer='raw',
            ##need to set to 3 in order for faust to work. it will create a new topic
            ## <faust-id>-__assignor-__leader topic
            topic_replication_factor=3,
            topic_disable_leader=False,
            topic_allow_declare = True,

        )

for fastapi, I have the following configuration. I include a loop argument that looks for current event loop by using asyncio.get_running_loop()

        app_api = faust.App(
            id=faust_app_id,
            broker=[
                f'kafka://{self.bootstrap_server}'
            ],
            broker_credentials=faust.SASLCredentials(
                username=self.username,
                password=self.password,
                ssl_context=self.ssl_settings
            ),
            autodiscover=True,
            origin="order.infrastructure.faust_app", #mandetory if autodiscover is enabled
            loop=asyncio.get_running_loop(),
            value_serializer='raw',
            reply_to="faust_reply_topic"
        )

The problem is when the entrypoint() is triggered by hitting the root url of fastapi, the process sends out message to worker without any issue. The worker console log shows the agent stream is being triggered and executed without any problem

[2022-04-15 09:31:24,975] [53402] [WARNING] order id: AB001 has balance of 0 

Then the whole app just hangs on here. fastapi never receive anything from awaiting the agent that is supposed to yield balance_order_table[order.order_id].

I am working this project with confluent cloud + self-hosted kafka cluster and both seemed to display the same behaviour.

2

There are 2 best solutions below

0
On

This is caused by faust not waiting for agent/table initialisation in client-only mode. Just replace FastAPI's app startup handler with something like that:

@app.on_event("startup")
async def startup():
    # set up the faust app
    faust_app = worker.set_faust_app_for_api()
    await faust_app.start_client()
    await asyncio.sleep(5.0) # wait for agents and table to initialise 
    await faust_app.topics.on_client_only_start() # resubscribe to topics

Please note that in case you're using faust-streaming fork please make sure to use a version newer than 5th of October 2022 as it had another code issue preventing this code to work.

0
On

If you configure logging in your FastAPI app:

import logging
logging.basicConfig(level='INFO')
...

you should see a KeyError in the Contductor when it tries to lookup a callback for the reply_to topic. The following attempts to fix the issue: https://github.com/faust-streaming/faust/pull/510

The problem is basically that the call to start the app in client-only mode was happening before the reply_to topic was registered, so the callbacks table wasn't being built in time.