asyncio requests (aiohttp, aioftp, etc) in a class-based design

91 Views Asked by At

I am trying to move my codebase over to asyncio and aiohttp, aioftp, etc. I've been led to believe that the clients should be reused for multiple requests instead of opening a new one each time. For instance, if I have a multi-part code that requires 2 GETs then a POST, I should use the same aiohttp.ClientSession() for all 3.

My problem is - I have a class-based pattern that abstracts away the FTP, HTTP, etc. operations and I'm having trouble converting it to something compatible with asyncio that preserves the original pattern.

For instance, let's say I have:

class Task1:
   def __init__(self, method):
class Task1:
   def __init__(self, method):
      self.method = method

   async def doTask(self):
      status = await self.method.send("blah")
      status2 = await self.method.send("blah2")
      status3 = await self.method.send("blah3")
      if status < 300 and status2 < 300 and status3 < 300:
         return 200
      else:
         return 500

class Task2:
   def __init__(self, method):
      self.method = method

   async def doTask(self):
      status = await self.method.send("{\"myjson\":1}")
      status2 = await self.method.send("something else")
      if status < 300 and status2 < 300:
         return 200
      else:
         return 500

class HTTP:
    def __init__(self, url):
        self.url = url

    async def send(self, data):
        async with aiohttp.ClientSession() as client:
            async with client.post(self.url, data=data) as resp:
                return resp.status

class FTP:
    def __init__(self, url):
        self.url = url

    async def send(self, data):
        bytestream = io.BytesIO(data.encode('utf-8'))
        status = 500
        while True:
            try:
                async with aioftp.Client.context(self.url, 21, "", "") as client:
                    bytestream.seek(0)
                    async with client.upload_stream("data.txt") as stream:
                        while True:
                            data = bytestream.read(256)
                            if not data:
                                break
                            await stream.write(data)
                        status = 200
                        break
            except ConnectionResetError:
                return status
        return status
            

async def main(method,task):
    mymethod = None
    if method == "ftp":
        mymethod = FTP("www.some-endpoint.com")
    elif method == "http":
        mymethod = HTTP("https://webhook.site/5d1341ad-8b1b-4e4d-a61d-0756b009964e")

    mytask = None
    if task == "1":
        mytask= Task1(mymethod)
    elif task == "2":
        mytask= Task2(mymethod)

    if mytask is not None and mymethod is not None:
        res = await mytask.doTask()
        if res == 200:
            print("done")
        else:
            print("fail")

The way this code is currently set up, when main runs it would call await mytask.doTask() which would open up either multiple aiohttp.ClientSession() or multiple aioftp.Client() which I've been discouraged from doing. I'd love for only one to open up, but I'm worried that would require undoing all of the abstraction I have set up right now.

Other things I've seen involve implementing __aenter()__ and __aexit()__ for each method (HTTP, FTP) and instantiating the client there but I don't know if that's proper in python, and my current implementation don't really work. But it would look something like:

   if mytask is not None and mymethod is not None:
      async with mytask.method.client:
        res = await mytask.doTask()
        if res == 200:
            print("done")
        else:
            print("fail")

Where each method implements its own property called client that is call-able in an abstract way from main. Trying to do this, I keep getting AttributeErrors on aenter which would mean the client isn't even getting set up, as in this example:

    async def __aenter__(self):
        self.client = aiohttp.ClientSession()
        return self

Any idea what I'm doing wrong? How to architect an abstract class-based asyncio server?

Edit: Based on a comment I did a quick hack of creating a global dictionary with keys of method and values of client. Then use just the client handle in the async with and pass that down to the actual method classes. Here's an example, though I think there's some jank - I wish I didn't need to modify the doTask() signature to pass a client variable down but I'm still investigating.

import asyncio
import aiohttp
import aioftp
import io
import sys

class Task1:
   def __init__(self, method):
      self.method = method

   async def doTask(self, client):
      self.method.client = client
      status = await self.method.send("blah")
      status2 = await self.method.send("blah2")
      status3 = await self.method.send("blah3")
      if status < 300 and status2 < 300 and status3 < 300:
         return 200
      else:
         return 500

class Task2:
   def __init__(self, method):
      self.method = method

   async def doTask(self, client):
      self.method.client = client
      status = await self.method.send("{\"myjson\":1}")
      status2 = await self.method.send("something else")
      if status < 300 and status2 < 300:
         return 200
      else:
         return 500

class HTTP:
    def __init__(self, url):
        self.url = url
        ClientList['http'] = aiohttp.ClientSession()
        self.client = None

    async def send(self, data):
        async with self.client.post(self.url, data=data) as resp:
            return resp.status

class FTP:
    def __init__(self, url):
        self.url = url
        ClientList['ftp'] = aioftp.Client.context(self.url, 21, "dlpuser", "rNrKYTX9g7z3RgJRmxWuGHbeu")
        self.client = None

    async def send(self, data):
        bytestream = io.BytesIO(data.encode('utf-8'))
        status = 500
        while True:
            try:
                bytestream.seek(0)
                async with self.client.upload_stream("data.txt") as stream:
                    while True:
                        data = bytestream.read(256)
                        if not data:
                            break
                        await stream.write(data)
                    status = 200
                    break
            except ConnectionResetError:
                return status
        return status
            

async def main(method,task):
    global ClientList
    ClientList = {}

    mymethod = None
    if method == "ftp":
        mymethod = FTP("ftp.someendpoint.com")
    elif method == "http":
        mymethod = HTTP("https://someendpoint.com")

    mytask = None
    if task == "1":
        mytask= Task1(mymethod)
    elif task == "2":
        mytask= Task2(mymethod)

    if mytask is not None and mymethod is not None:
        async with ClientList[method] as client:
            res = await mytask.doTask(client)
            if res == 200:
                print("done")
            else:
                print("fail")
0

There are 0 best solutions below