Testing out Sanic, currently have routes that when hit triggers a write to SQS. Trying to make that write asynchronous by adding the info to a queue that is then consumed 'independently' (in a non-blocking manner) of the response returned by the Sanic Server.
Below is the code I've got so far. It makes a call to SQS however it seems I have an error with using the wrong loop / creating multiple loops -> I get an error stating "loop argument must agree with future" and the server just hangs, not returning a response at all.
Also, Sanic uses uvloop, and I'm not sure how / if I should be integrating the queue into a uvloop rather than a separate asyncio loop. The Sanic server is instantiated by passing it a uvloop (uvloop.new_event_loop()).
import asyncio
asyncio_loop = asyncio.get_event_loop
async_queue = asyncio.Queue()
async def consumer_async_queue(q):
while True:
item = await q.get()
if item is None:
q.task_done()
break
else:
# function call to SQS
await record_to_sqs(item['json'], item['log_to_meta'])
q.task_done()
async def producer_async_queue(q, item):
await q.put(item)
await q.put(None)
await q.join()
async def main(q, item):
producers = asyncio_loop.create_task(producer_async_queue(q, item))
consumers = asyncio_loop.create_task(consumer_async_queue(q))
await asyncio.wait([producers] + [consumers])
async def service():
* Other Stuff *
try:
print(dir(asyncio_loop))
asyncio_loop.create_task(main(async_queue, item))
asyncio_loop.run_forever()
except Exception as e:
print(e)
print("ERRORING")
finally:
pass
# asyncio_loop.close()
@app.route('/api/example', methods=['GET'])
async def route(request):
return await service(request)