Sanic Server Python 3.5 create async queue to write to AWS SQS

1k Views Asked by At

Testing out Sanic, currently have routes that when hit triggers a write to SQS. Trying to make that write asynchronous by adding the info to a queue that is then consumed 'independently' (in a non-blocking manner) of the response returned by the Sanic Server.

Below is the code I've got so far. It makes a call to SQS however it seems I have an error with using the wrong loop / creating multiple loops -> I get an error stating "loop argument must agree with future" and the server just hangs, not returning a response at all.

Also, Sanic uses uvloop, and I'm not sure how / if I should be integrating the queue into a uvloop rather than a separate asyncio loop. The Sanic server is instantiated by passing it a uvloop (uvloop.new_event_loop()).

import asyncio
asyncio_loop = asyncio.get_event_loop
async_queue = asyncio.Queue()

async def consumer_async_queue(q):
    while True:
        item = await q.get()
        if item is None:
            q.task_done()
            break
        else:
            # function call to SQS
            await record_to_sqs(item['json'], item['log_to_meta'])
            q.task_done()

async def producer_async_queue(q, item):
    await q.put(item)
    await q.put(None)
    await q.join()

async def main(q, item):
    producers = asyncio_loop.create_task(producer_async_queue(q, item))
    consumers = asyncio_loop.create_task(consumer_async_queue(q))
    await asyncio.wait([producers] + [consumers])

async def service():
    * Other Stuff *
    try:
        print(dir(asyncio_loop))
        asyncio_loop.create_task(main(async_queue, item))
        asyncio_loop.run_forever()
    except Exception as e:
        print(e)
        print("ERRORING")
    finally:
        pass
        # asyncio_loop.close()


@app.route('/api/example', methods=['GET'])
async def route(request):
    return await service(request)
0

There are 0 best solutions below