Code not running on M1 Pro Mac but does on Intel, how to solve this?

56 Views Asked by At
from binance.enums import *
# from binance.client import Client
from binance.streams import ThreadedWebsocketManager
import pandas as pd
import os
import nest_asyncio

nest_asyncio.apply()

# Define the structure of your DataFrame
columns = ['timestamp', 'best_bid_price', 'best_bid_qty', 'best_ask_price', 'best_ask_qty']
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Buffer to store incoming data
buffer = []
BUFFER_SIZE = 3 #   10000

def process_message(msg):
    
    print(30 * "-")
    # logging.info("Message received")
    # logging.info(f"Message : \n{msg}")
    global buffer
    timestamp = pd.to_datetime(msg['E'], unit='ms')
    # print("timestamp : ", timestamp)
    # Assuming 'b' and 'a' are lists containing best bid and ask data, respectively
    # Each entry in 'b' or 'a' is assumed to be [price, quantity]
    best_bid = msg['b'][0] if msg['b'] else [None, None]  # Take the first or set None if empty
    best_ask = msg['a'][0] if msg['a'] else [None, None]  # Take the first or set None if empty
    buffer.append([timestamp, best_bid[0], best_bid[1], best_ask[0], best_ask[1]])

    if len(buffer) >= BUFFER_SIZE:
        update_dataframe()

def update_dataframe():
    global buffer, order_book_df, columns
    print(30 * "-")
    # print("buffer : \n", buffer)
    logging.info(f"Updating DataFrame with {len(buffer)} entries")
    temp_df = pd.DataFrame(buffer, columns=columns)
    temp_df.set_index('timestamp', inplace=True)
    # print("temp_df info : \n", temp_df.info())
    print("temp_df : \n", temp_df)
    
    temp_df['best_bid_price']   = pd.to_numeric(temp_df['best_bid_price'])
    temp_df['best_bid_qty']     = pd.to_numeric(temp_df['best_bid_qty'])
    temp_df['best_ask_price']   = pd.to_numeric(temp_df['best_ask_price'])
    temp_df['best_ask_qty']     = pd.to_numeric(temp_df['best_ask_qty'])
    
    # temp_df['best_bid_price']   = fill_nulls(temp_df['best_bid_price'])
    # temp_df['best_bid_qty']     = fill_nulls(temp_df['best_bid_qty'])
    # temp_df['best_ask_price']   = fill_nulls(temp_df['best_ask_price'])
    # temp_df['best_ask_qty']     = fill_nulls(temp_df['best_ask_qty'])
    
    temp_df = fill_nulls(temp_df)
        
    # print("temp_df num : \n", temp_df)
    print("temp_df (Buffer) : \n", temp_df)
    order_book_df = pd.concat([order_book_df, temp_df])  # , ignore_index=True)
    buffer.clear()  # Clear the buffer after updating
    # Optionally, trim DataFrame
    order_book_df = order_book_df.tail(1000)
    # print(30 * "-")
    # print("order_book_df info : \n", order_book_df.info()) 
    # print("order_book_df : \n", order_book_df) 

def fill_nulls(data):
    global order_book_df, columns
    for col in columns[1:]:
        for i in range(len(data)):
            if math.isnan(data[col].iloc[i]):
                if i == 0:
                    try :
                        data[col].iloc[i] = order_book_df[col].iloc[-1]
                    except Exception as e:
                        print(e)
                        data[col].iloc[i] = data[col].mean()
                else :
                    data[col].iloc[i] = data[col].iloc[i-1]
    # prev_data = data.shift().fillna(data.mean())
    # next_data = data.shift(-1).fillna(data.mean())
    # data = data.fillna(prev_data + next_data / 2)
    return data

def analyze_data():
    global order_book_df, twm
    while True:
     
        sleep(5)  # Analysis frequency
     
                # logging.info("Starting analysis")
        try : 
            if not order_book_df.empty:
                # logging.info("Data not empty !")
                # print(order_book_df.tail())
                trades = check_severe_exhaustion_and_trend_reversal(order_book_df.copy())   # , 100, 6)  # Copy to avoid potential issues with threading
                trades_df = pd.DataFrame(trades)
                print("trades : \n", trades_df)
        except Exception as e :
            print(e)
            pass
            

# Initialize your DataFrame and WebSocket
order_book_df = pd.DataFrame(columns=columns)
order_book_df.set_index('timestamp', inplace=True)

api_key          = os.getenv('API_KEY')
api_secret       = os.getenv('API_SECRET')


twm = ThreadedWebsocketManager(api_key      = api_key, 
                               api_secret   = api_secret,
                               testnet      = True)

twm.start()
twm.start_depth_socket(callback=process_message, symbol='1000PEPEUSDT')

# Start analysis thread
analysis_thread = threading.Thread(target=analyze_data)
analysis_thread.start()

# Keep the main thread running
twm.join()

I'm trying to run a trading strategy using binance websocket API, this strategy uses orderbook exhaustion, I haven't included it, only the most relevant part of the code.

When i try running my code it doesn't work, it used to before including making trades.

Now, it's stuck running forever in the terminal without producing any output whatsoever.image of terminal

The check_severe_exhaustion_and_trend_reversal() function essentially returns a dataframe of trades in this format:

{
    'Entry Time': '2024-03-04 09:00:00',  
    'Exit Time': '2024-03-04 12:00:00',  
    'Entry Price': 100,  
    'Exit Price': 105,  
    'Percentage Profit': 5.0
} 

I've tried using a threading.Lock() inside process_message() and analyze_data(). In addition I've debugged the code in VSCode but since it can only debug the main thread, I can't find where something goes wrong.

0

There are 0 best solutions below