Python TCP Server that both sends and or receives data (independently) using asyncio streams?

39 Views Asked by At

I've a python-based TCP server using asyncio, but it has an specific behavior: The TCP client (a PLC) connects to the app, but actually both the server or the client can start data flows. Once the data flow starts, it will be finished via specific-designed ACK messages.

The TCP server part is connected to a RMQ queue; on each new message, it will start a data flow with the PLC, wait response, then ACK each side and the flow is finished. On the PLC side, a flow can be started at any time (but if there is an ongoing flow, it will be just rejected), then the TCP server will do some processing, and then send response, the PLC sends ACK, the TCP server responds ACK, then the flow is finished.

The connection is kept always up.

I've implemented this using asyncio server and protocols, but I find the code to be overcomplexv and I don't like how bad it's adding new features or incrementing functionality.

I thought that I could be using the streams API instead, that seems much clearer, but I can't think if a way to handle both client-initiated requests through the reader, or start server requests to the client. I just wonder if this is possible?

1

There are 1 best solutions below

0
Alberto On

So, after checking some possible options, I have this code that I can check it's working. Using a PLC simulator, connecting to the TCP port, I can either start a data flow from PC to PLC, and the other way around.

class PLCRing:   
  def __init__(self, port: int, host: str):
     self._port = port
     self._host = host
     self._srv = None
     self._queue = asyncio.Queue()
     self._processing = asyncio.Event()
     self._stopping = asyncio.Event()

  async def start(self):
     self._srv = await asyncio.start_server(self._process, self._host, self._port)
     # Connect listener to rabbitmq and start processing, which will push 
     # each new message into our internal queue.
     await self._messaging.connect()
     sub = asyncio.create_task(self._messaging.process(self._queue)
     await sub
     await self._srv.serve_forever()

  async def _process(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
     print(f"Client connection from {writer.transport.get_extra_info("peername")}") 
     try:
       while True:
         # if we are stopping, no more processing is needed
         if self._stopping.is_set():
            break

         # This could be not necessary, since each loop should be self-contained.
         if self._processing.is_set():
            # avoid doing nothing for the time being, we are still 
            # processing something
            continue
     
         # First doubt, I could not find a better way to check if the PLC started a communication than by checking if we can read data
         try:
             buff = await asyncio.wait_for(reader.read(5232), 0.5)
             await self._process_msg_from_plc(buff, reader, writer)
         except asyncio.TimeoutError: 
             # Check if we have something in the queue to process and send
             if self._queue.empty():
                continue
           
             msg = await self._queue.get()
             await self._process_msg_to_plc(msg, reader, writer)
             self._queue.task_done()
     except asyncio.IncompleteReadError:
         # The PLC might have sent small amount of data until EOF 
         logger.warning("unexpected incomplete read")
         pass
     except asyncio.InvalidStateError:
         # Seems the tcp connection state gone wrong
         pass
     except Exception as e:
         logger.error(f"Something went wrong: {e}")
         pass
     finally:
         writer.close()
         await writer.wait_close()

Using the code above, I can both use writer and reader to interact in both flows the way I was expecting.

As stated, my main doubt comes on how to check the right way if there's any data pending. Also, by using the asyncio.wait_for with 500ms timeout, I think I can ensure it won't block or wait endlessly, but it might be a better way, too?