How can I create dependencies among multiple async methods in Python 3?

1.4k Views Asked by At

I have a class which contains multiple asyinc methods and I want to create dependencies among them. Dependency Graph In this picture both heartbeat and neighborhood_check depend on radio and sub depend on neighborhood_check, that means after starting the radio I want to start heartbeat and neighborhood_check. And after starting neighborhood_check I want to start the sub. I also have a asyinc_start function which start the methods so here I want to manage those but I can't.

Give me some suggestion.

The code of check_dependency.py is as follows:

import asyncio
import sys
import os
from util import run`enter code here`
class ChkDependency():
    async def radio(self):
        print("Radio is initialized")
        await asyncio.sleep(2)


    async def pub(self):
        await asyncio.sleep(2)
        print("pub is initialized")


    async def heartbeat(self):
        print("heartbeat started")
        await asyncio.sleep(2)


    async def neigh_hood_check(self):
        await asyncio.sleep(2)
        print("checking for matches in neigh list")


    async def subs(self):
        await asyncio.sleep(2)
        print("match found")
        await asyncio.sleep(2)
        print("subscribing.....")


if __name__ == '__main__':
    chkdependency = ChkDependency()
    async def start():
        while True:
            await asyncio.wait([
                chkdependency.radio(),
                chkdependency.pub(),
                chkdependency.heartbeat(),
                chkdependency.neigh_hood_check(),
                chkdependency.subs(),
            ])    

    try:
        run(
            start(),
          )
    except KeyboardInterrupt:
        print("Exiting...")
        exit()

The code of util.py is as follows:

import asyncio

def run(*args):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*args))

I was thinking of implementing the dependency using semaphore but there were no positive results. Please help !!

1

There are 1 best solutions below

1
On BEST ANSWER

From the description of the question, it is not clear what you want to know, so I’ll be assume.

  1. If you just want the methods to be executed in the given order, then just do this:

    try:
        while True:
            await chkdependency.radio()
            await chkdependency.pub()
            await chkdependency.heartbeat()
            await chkdependency.neigh_hood_check()
            await chkdependency.subs()
    

    But in this case, each subsequent method will wait for the completion of the previous one!

  2. If you want the methods to be guaranteed to start in the specified order, and end in any order, then synchronization primitives are indispensable:

    import asyncio
    from random import randint
    
    
    class ChkDependency:
        def __init__(self):
            self.radio_started = asyncio.Event()
            self.pub_started = asyncio.Event()
            self.heartbeat_started = asyncio.Event()
            self.neigh_hood_check_started = asyncio.Event()
            self.sub_started = asyncio.Event()
    
        async def radio(self):
            await self.radio_started.wait()
            print("1 Start radio")
            self.pub_started.set()
            await asyncio.sleep(randint(1, 5))
            print("1 Finish radio")
            self.radio_started.clear()
    
        async def pub(self):
            await self.pub_started.wait()
            print("2 Start pub")
            self.heartbeat_started.set()
            await asyncio.sleep(randint(1, 5))
            print("2 Finish pub")
            self.pub_started.clear()
    
        async def heartbeat(self):
            await self.heartbeat_started.wait()
            print("3 Start heartbeat")
            self.neigh_hood_check_started.set()
            await asyncio.sleep(randint(1, 5))
            print("3 Finish heartbeat")
            self.heartbeat_started.clear()
    
        async def neigh_hood_check(self):
            await self.neigh_hood_check_started.wait()
            print("4 Start neigh_hood_check")
            self.sub_started.set()
            await asyncio.sleep(randint(1, 5))
            print("4 Finish neigh_hood_check")
            self.neigh_hood_check_started.clear()
    
        async def subs(self):
            await self.sub_started.wait()
            print("5 Start subs")
            await asyncio.sleep(randint(1, 5))
            print("5 Finish subs")
            self.sub_started.clear()
    
        async def start(self):
            while True:
                self.radio_started.set()
                print("\nStart new cycle:")
                await asyncio.gather(
                    self.radio(),
                    self.pub(),
                    self.heartbeat(),
                    self.neigh_hood_check(),
                    self.subs()
                )
    
    
    async def main():
        chkdependency = ChkDependency()
        await chkdependency.start()
    
    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print("Exiting...")
    
  3. Keep in mind that if you do this (or the same with asyncio.wait instead of asyncio.gather):

    async def start():
        while True:
            await asyncio.gather(
                chkdependency.radio(),
                chkdependency.pub(),
                chkdependency.heartbeat(),
                chkdependency.neigh_hood_check(),
                chkdependency.subs()
            )
    

    it may seem to you that the methods start in turn. That because internally, both asyncio.gather and asyncio.wait run them in a loop. However, keep in mind that this behavior is not guaranteed! And this means that at any moment, for example, in the new version of the library or for some internal reasons, the methods can start in a different order.

  4. It also seems you don't need the run method, since asyncio.run does the same!