Using websockets with thread in Python (for cryptocurrency trading)

151 Views Asked by At

I am trying to make a cryptocurrency trading program in Upbit using Python.

I want to make two threads: In the first thread, real-time cryptocurrency will be obtained by using Upbit-websockets. In the second thread, I receive the price from the first thread and if the price is higher/lower than a specific price, I want to sell/buy it.

I made a sample code which brings the real-time cryto's price by using websockets as below:

import json
import asyncio
import nest_asyncio
from upbit.websocket import UpbitWebSocket
nest_asyncio.apply()


async def ticker(sock, payload):
    async with sock as conn:
        await conn.send(payload)
        while True:
            recv = await conn.recv()
            data = recv.decode('utf8')
            result = json.loads(data)
            print(result)


sock = UpbitWebSocket()

currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
    type='ticker',
    codes=currencies
)
payload = sock.generate_payload(
    type_fields=[type_field]
)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete( ticker(sock, payload) )

It works well. Based on this code, I tried to move on to the next step. Before the real trading, I wanted to print the crypto's price in the second thread which is achieved by the first thread. I made the code like below but it does not work properly.

import json
import asyncio
import nest_asyncio
import threading
from upbit.websocket import UpbitWebSocket
import time
import tracemalloc
import queue
tracemalloc.start()
nest_asyncio.apply()

# Function to fetch real-time BTC price
async def ticker(sock, payload, price_queue):
    async with sock as conn:
        await conn.send(payload)
        while True:
            recv = await conn.recv()
            data = recv.decode('utf8')
            result = json.loads(data)
            price_queue.put(result)  # Put the price in the queue

# Thread to print the BTC price
def print_price_thread(price_queue):
    while True:
        price = price_queue.get()  # Get the price from the queue
        print(price)

sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
    type='ticker',
    codes=currencies
)
payload = sock.generate_payload(
    type_fields=[type_field]
)

price_queue = queue.Queue()  # Create a queue to pass prices between threads

# Create and start the threads
price_fetcher = threading.Thread(target=ticker, args=(sock, payload, price_queue))
price_printer = threading.Thread(target=print_price_thread, args=(price_queue,))

# Start both threads
price_fetcher.start()
price_printer.start()

I think using thread and asyncio is quite tricky. Is there any way that I can make the above code works?

1

There are 1 best solutions below

0
On

It is likely because the event loop in the ticker coroutine is not running in the thread where it was created. To make this work, you should use loop.run_in_executor to run the coroutine in the correct thread.

Here's a modified version of your code:

import json
import asyncio
import nest_asyncio
import threading
from upbit.websocket import UpbitWebSocket
import queue

nest_asyncio.apply()

# Function to fetch real-time BTC price
async def ticker(sock, payload, price_queue):
    async with sock as conn:
        await conn.send(payload)
        while True:
            recv = await conn.recv()
            data = recv.decode('utf8')
            result = json.loads(data)
            price_queue.put(result)  # Put the price in the queue

# Thread to print the BTC price
def print_price_thread(loop, sock, payload, price_queue):
    asyncio.set_event_loop(loop)
    loop.run_until_complete(ticker(sock, payload, price_queue))

sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
    type='ticker',
    codes=currencies
)
payload = sock.generate_payload(
    type_fields=[type_field]
)

price_queue = queue.Queue()  # Create a queue to pass prices between threads

# Create and start the threads
event_loop = asyncio.new_event_loop()
price_fetcher = threading.Thread(target=event_loop.run_until_complete, args=(ticker(sock, payload, price_queue),))
price_printer = threading.Thread(target=print_price_thread, args=(event_loop, sock, payload, price_queue))

# Start both threads
price_fetcher.start()
price_printer.start()

This modified code creates a new event loop for the asyncio tasks in the threads and uses run_until_complete to run the ticker coroutine in the correct thread. Additionally, the print_price_thread function now takes the event loop as an argument.

You can try and see if it resolves the issues you were facing!