Run Websocket in a separate Thread, updating class attributes

299 Views Asked by At

I want to implement a class with the possibility to start various websockets in different threads to retrieve market data and update the class attributes. I am using the kucoin-python-sdk library to that purpose. The below works fine in spyder, however when I set my script to run via a conda batch it fails with the following errors over and over. Thank you.

<Task finished name='Task-4' coro=<ConnectWebsocket._run() done,> defined at > path\lib\site-packages\kucoin\websocket\websocket.py:33>> exception=RuntimeError("can't register atexit after shutdown")> got an> exception can't register atexit after shutdown pending name='Task-3' coro=<ConnectWebsocket._recover_topic_req_msg() running> at> path\lib\site-packages\kucoin\websocket\websocket.py:127>> wait_for=> cancel ok.> _reconnect over.

<Task finished name='Task-7' coro=<ConnectWebsocket._run() done, defined at>> path\lib\site-packages\kucoin\websocket\websocket. py:33>> exception=RuntimeError("can't register atexit after shutdown")> got an> exception can't register atexit after shutdown pending name='Task-6' coro=<ConnectWebsocket._recover_topic_req_msg() running> at path\lib\site-packages\kucoin\websocket\websocket.py:127>> wait_for=> cancel ok.> _reconnect over.

Hence wondering:

  1. Does the issue come from the Kucoin package or is my implementation of threads/asyncio incorrect ?
  2. How to explain the different behavior between Spyder execution and conda on the same environment ?

Python 3.9.13 | Spyder 5.3.3 | Spyder kernel 2.3.3 | websocket 0.2.1 | nest-asyncio 1.5.6 | kucoin-python 1.0.11

Class_X.py

import asyncio
import nest_asyncio
nest_asyncio.apply()
from kucoin.client import WsToken
from kucoin.ws_client import KucoinWsClient
from threading import Thread

class class_X():
    def __init__(self):
        self.msg= ""
                     
    async def main(self):
        async def book_msg(msg):
                self.msg = msg
        client = WsToken()
        ws_client = await KucoinWsClient.create(None, client, book_msg, private=False)
        await ws_client.subscribe(f'/market/level2:BTC-USDT')
        while True:
            await asyncio.sleep(20)
    
    def launch(self):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            loop.run_until_complete(self.main())
    

instance = class_X()
t = Thread(target=instance.launch)
t.start()    

Batch

call path\anaconda3\Scripts\activate myENV
python "path1\class_X.py"
conda deactivate
1

There are 1 best solutions below

0
On

I want to say it's your implementation but I haven't tried using that client the way you're doing it. Here's a pared down skeleton of what I'm doing to implement that kucoin-python in async.

import asyncio
from kucoin.client import WsToken
from kucoin.ws_client import KucoinWsClient
from kucoin.client import Market
from kucoin.client import User
from kucoin.client import Trade

async def main():
    
    async def handle_event(msg):
        if '/market/snapshot:' in msg['topic']:
            snapshot = msg['data']['data']
            ## trade logic here using snapshot data
          
        elif msg['topic'] == '/spotMarket/tradeOrders':
            print(msg['data'])

        else:
            print("Unhandled message type")
            print(msg)

    async def unsubscribeFromPublicSnapsot(symbol):
        ksm.unsubscribe('/market/snapshot:' + symbol)
    
    async def subscribeToPublicSnapshot(symbol):
        try:
            print("subscribing to " + symbol)
            await ksm.subscribe('/market/snapshot:' + symbol)
        except Exception as e:
            print("Error subscribing to snapshot for " + doc['currency'])
            print(e)

    pubClient = WsToken()
    print("creating websocket client")
    ksm = await KucoinWsClient.create(None, pubClient, handle_event, private=False)

    # for private topics pass private=True
    privateClient = WsToken(config["tradeKey"], config["tradeSecret"], config["tradePass"])
    ksm_private = await KucoinWsClient.create(None, privateClient, handle_event, private=True)
    # Always subscribe to BTC-USDT
    await subscribeToPublicSnapshot('BTC-USDT')
    # Subscribe to the currency-BTC spot market for each available currency
    for doc in tradeable_holdings:
        if doc['currency'] != 'BTC':  # Don't need to resubscribe :D
           await subscribeToPublicSnapshot(doc['currency'] + "-BTC")

    # Subscribe to spot market trade orders
    await ksm_private.subscribe('/spotMarket/tradeOrders')


if __name__ == "__main__":
    print("Step 1: Kubot initialzied")
    print("Step 2: ???")
    print("Step 2: Profit")
    loopMain = asyncio.get_event_loop()
    loopMain.create_task(main())
    loopMain.run_forever()
    loopMain.close()

As you can probably guess, "tradeable_holdings" is a list of symbols I'm interested in that I already own. You'll also notice I'm using the snapshot instead of the market/ticker subscription. I think at 100ms updates on the ticker, it could quickly run into latency and race conditions - at least until I figure out how to deal with those. So I opted for the snapshot which only updates every 2 seconds and for the less active coins, not even that often.

Anyway, I'm not to where it's looking to trade but I'm quickly getting to that logic.

Hope this helps you figure your implementation out even though it's different.