I am trying to make a cryptocurrency trading program in Upbit using Python.
I want to make two threads: In the first thread, real-time cryptocurrency will be obtained by using Upbit-websockets. In the second thread, I receive the price from the first thread and if the price is higher/lower than a specific price, I want to sell/buy it.
I made a sample code which brings the real-time cryto's price by using websockets as below:
import json
import asyncio
import nest_asyncio
from upbit.websocket import UpbitWebSocket
nest_asyncio.apply()
async def ticker(sock, payload):
async with sock as conn:
await conn.send(payload)
while True:
recv = await conn.recv()
data = recv.decode('utf8')
result = json.loads(data)
print(result)
sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
type='ticker',
codes=currencies
)
payload = sock.generate_payload(
type_fields=[type_field]
)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete( ticker(sock, payload) )
It works well. Based on this code, I tried to move on to the next step. Before the real trading, I wanted to print the crypto's price in the second thread which is achieved by the first thread. I made the code like below but it does not work properly.
import json
import asyncio
import nest_asyncio
import threading
from upbit.websocket import UpbitWebSocket
import time
import tracemalloc
import queue
tracemalloc.start()
nest_asyncio.apply()
# Function to fetch real-time BTC price
async def ticker(sock, payload, price_queue):
async with sock as conn:
await conn.send(payload)
while True:
recv = await conn.recv()
data = recv.decode('utf8')
result = json.loads(data)
price_queue.put(result) # Put the price in the queue
# Thread to print the BTC price
def print_price_thread(price_queue):
while True:
price = price_queue.get() # Get the price from the queue
print(price)
sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
type='ticker',
codes=currencies
)
payload = sock.generate_payload(
type_fields=[type_field]
)
price_queue = queue.Queue() # Create a queue to pass prices between threads
# Create and start the threads
price_fetcher = threading.Thread(target=ticker, args=(sock, payload, price_queue))
price_printer = threading.Thread(target=print_price_thread, args=(price_queue,))
# Start both threads
price_fetcher.start()
price_printer.start()
I think using thread and asyncio is quite tricky. Is there any way that I can make the above code works?
It is likely because the event loop in the
ticker
coroutine is not running in the thread where it was created. To make this work, you should useloop.run_in_executor
to run the coroutine in the correct thread.Here's a modified version of your code:
This modified code creates a new event loop for the asyncio tasks in the threads and uses
run_until_complete
to run theticker
coroutine in the correct thread. Additionally, theprint_price_thread
function now takes the event loop as an argument.You can try and see if it resolves the issues you were facing!