from binance.enums import *
# from binance.client import Client
from binance.streams import ThreadedWebsocketManager
import pandas as pd
import os
import nest_asyncio
nest_asyncio.apply()
# Define the structure of your DataFrame
columns = ['timestamp', 'best_bid_price', 'best_bid_qty', 'best_ask_price', 'best_ask_qty']
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Buffer to store incoming data
buffer = []
BUFFER_SIZE = 3 # 10000
def process_message(msg):
print(30 * "-")
# logging.info("Message received")
# logging.info(f"Message : \n{msg}")
global buffer
timestamp = pd.to_datetime(msg['E'], unit='ms')
# print("timestamp : ", timestamp)
# Assuming 'b' and 'a' are lists containing best bid and ask data, respectively
# Each entry in 'b' or 'a' is assumed to be [price, quantity]
best_bid = msg['b'][0] if msg['b'] else [None, None] # Take the first or set None if empty
best_ask = msg['a'][0] if msg['a'] else [None, None] # Take the first or set None if empty
buffer.append([timestamp, best_bid[0], best_bid[1], best_ask[0], best_ask[1]])
if len(buffer) >= BUFFER_SIZE:
update_dataframe()
def update_dataframe():
global buffer, order_book_df, columns
print(30 * "-")
# print("buffer : \n", buffer)
logging.info(f"Updating DataFrame with {len(buffer)} entries")
temp_df = pd.DataFrame(buffer, columns=columns)
temp_df.set_index('timestamp', inplace=True)
# print("temp_df info : \n", temp_df.info())
print("temp_df : \n", temp_df)
temp_df['best_bid_price'] = pd.to_numeric(temp_df['best_bid_price'])
temp_df['best_bid_qty'] = pd.to_numeric(temp_df['best_bid_qty'])
temp_df['best_ask_price'] = pd.to_numeric(temp_df['best_ask_price'])
temp_df['best_ask_qty'] = pd.to_numeric(temp_df['best_ask_qty'])
# temp_df['best_bid_price'] = fill_nulls(temp_df['best_bid_price'])
# temp_df['best_bid_qty'] = fill_nulls(temp_df['best_bid_qty'])
# temp_df['best_ask_price'] = fill_nulls(temp_df['best_ask_price'])
# temp_df['best_ask_qty'] = fill_nulls(temp_df['best_ask_qty'])
temp_df = fill_nulls(temp_df)
# print("temp_df num : \n", temp_df)
print("temp_df (Buffer) : \n", temp_df)
order_book_df = pd.concat([order_book_df, temp_df]) # , ignore_index=True)
buffer.clear() # Clear the buffer after updating
# Optionally, trim DataFrame
order_book_df = order_book_df.tail(1000)
# print(30 * "-")
# print("order_book_df info : \n", order_book_df.info())
# print("order_book_df : \n", order_book_df)
def fill_nulls(data):
global order_book_df, columns
for col in columns[1:]:
for i in range(len(data)):
if math.isnan(data[col].iloc[i]):
if i == 0:
try :
data[col].iloc[i] = order_book_df[col].iloc[-1]
except Exception as e:
print(e)
data[col].iloc[i] = data[col].mean()
else :
data[col].iloc[i] = data[col].iloc[i-1]
# prev_data = data.shift().fillna(data.mean())
# next_data = data.shift(-1).fillna(data.mean())
# data = data.fillna(prev_data + next_data / 2)
return data
def analyze_data():
global order_book_df, twm
while True:
sleep(5) # Analysis frequency
# logging.info("Starting analysis")
try :
if not order_book_df.empty:
# logging.info("Data not empty !")
# print(order_book_df.tail())
trades = check_severe_exhaustion_and_trend_reversal(order_book_df.copy()) # , 100, 6) # Copy to avoid potential issues with threading
trades_df = pd.DataFrame(trades)
print("trades : \n", trades_df)
except Exception as e :
print(e)
pass
# Initialize your DataFrame and WebSocket
order_book_df = pd.DataFrame(columns=columns)
order_book_df.set_index('timestamp', inplace=True)
api_key = os.getenv('API_KEY')
api_secret = os.getenv('API_SECRET')
twm = ThreadedWebsocketManager(api_key = api_key,
api_secret = api_secret,
testnet = True)
twm.start()
twm.start_depth_socket(callback=process_message, symbol='1000PEPEUSDT')
# Start analysis thread
analysis_thread = threading.Thread(target=analyze_data)
analysis_thread.start()
# Keep the main thread running
twm.join()
I'm trying to run a trading strategy using binance websocket API, this strategy uses orderbook exhaustion, I haven't included it, only the most relevant part of the code.
When i try running my code it doesn't work, it used to before including making trades.
Now, it's stuck running forever in the terminal without producing any output whatsoever.
The check_severe_exhaustion_and_trend_reversal() function essentially returns a dataframe of trades in this format:
{
'Entry Time': '2024-03-04 09:00:00',
'Exit Time': '2024-03-04 12:00:00',
'Entry Price': 100,
'Exit Price': 105,
'Percentage Profit': 5.0
}
I've tried using a threading.Lock() inside process_message() and analyze_data(). In addition I've debugged the code in VSCode but since it can only debug the main thread, I can't find where something goes wrong.