Hi I am trying to integrate faust with fastapi endpoints following this example: toh995/fastapi-faust-example
I am working with a simple DummyOrder model
class DummyOrder(faust.Record,MyAvroModel,serializer='avro_order_codec'):
order_id: str
amount: int
I have an faust agent that yields balance
@app.agent(test_topic)
async def table_balance(orders: faust.Stream):
async for order in orders.group_by(DummyOrder.order_id):
print (f'order id: {order.order_id} has balance of {balance_order_table[order.order_id]}')
yield balance_order_table[order.order_id]
For fastapi, I have
@fastapi_app.on_event("startup")
async def startup():
#set up the faust app
worker.faust_app_for_api('faust')
faust_app = worker.get_faust_app()
print('starting client')
#start the faust app in client mode
asyncio.create_task(
faust_app.start_client()
)
print('Client Created')
@fastapi_app.get("/")
async def entrypoint():
from order.infrastructure.faust_app.tasks import order_balance
print("getting balance")
balance = await order_balance.table_balance.ask(DummyOrder(order_id='AB001', amount=0))
print(balance)
return balance
if __name__ == '__main__':
uvicorn.run("fast_api_app:fastapi_app", host="0.0.0.0", port=3000)
Then I ran both faust worker and fastapi with the following faust.App configuration
for main faust worker
app = faust.App(
id=faust_app_id,
broker=[
f'kafka://{self.bootstrap_server}'
],
broker_credentials=faust.SASLCredentials(
username=self.username,
password=self.password,
ssl_context=self.ssl_settings
),
autodiscover=True,
origin="order.infrastructure.faust_app", #mandetory if autodiscover is enabled
value_serializer='raw',
##need to set to 3 in order for faust to work. it will create a new topic
## <faust-id>-__assignor-__leader topic
topic_replication_factor=3,
topic_disable_leader=False,
topic_allow_declare = True,
)
for fastapi, I have the following configuration. I include a loop argument that looks for current event loop by using asyncio.get_running_loop()
app_api = faust.App(
id=faust_app_id,
broker=[
f'kafka://{self.bootstrap_server}'
],
broker_credentials=faust.SASLCredentials(
username=self.username,
password=self.password,
ssl_context=self.ssl_settings
),
autodiscover=True,
origin="order.infrastructure.faust_app", #mandetory if autodiscover is enabled
loop=asyncio.get_running_loop(),
value_serializer='raw',
reply_to="faust_reply_topic"
)
The problem is when the entrypoint() is triggered by hitting the root url of fastapi, the process sends out message to worker without any issue. The worker console log shows the agent stream is being triggered and executed without any problem
[2022-04-15 09:31:24,975] [53402] [WARNING] order id: AB001 has balance of 0
Then the whole app just hangs on here. fastapi never receive anything from awaiting the agent that is supposed to yield balance_order_table[order.order_id].
I am working this project with confluent cloud + self-hosted kafka cluster and both seemed to display the same behaviour.
This is caused by faust not waiting for agent/table initialisation in client-only mode. Just replace FastAPI's app startup handler with something like that:
Please note that in case you're using faust-streaming fork please make sure to use a version newer than 5th of October 2022 as it had another code issue preventing this code to work.