Question: I am working on a project where I need to validate a large Pandas DataFrame against a JSON schema using the jsonschema library. To speed up the validation process, I am attempting to use multiprocessing with the concurrent.futures module in Python. However, I'm facing challenges with the apply function inside the lambda function, as it doesn't work well with parallel execution.
Details:
I have a Pandas DataFrame containing financial data that I want to validate against a JSON schema before inserting it into a MongoDB collection. The validation function is based on the jsonschema library.
Here is a simplified version of my code:
from the code block:
try:
futures = [executor.submit((lambda c:
not c.empty and c.apply(lambda r: validateDocument(r, schema)
, axis=1))(chunk)) for chunk in chunks]
# Wait for any thread to finish and check for validation errors
for future in as_completed(futures):
if callable(future.result):
future.result()
except ValidationError as e:
PriceHistoryDB.__logger.error(f"Error validating document: {e}")
print(f"Error validating document: {e}")
allDocumentsValid = False
return -1
except Exception as e:
PriceHistoryDB.__logger.error(f"Error: {e}")
print(f"Error: {e}")
allDocumentsValid = False
return -1
line no 107 catch block.
whole program code:
import pandas as pd
from jsonschema import validate, ValidationError
from pymongo import MongoClient
import logging
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
import warnings
warnings.filterwarnings("ignore", message="'DataFrame.swapaxes' is deprecated and will be removed in a future version. Please use 'DataFrame.transpose' instead.")
# db class and extend any required methods and export
class PriceHistoryDB:
__logger = None
# initialize logger with name, file name, format [time : type : message]
def __initialize_logger(self):
if PriceHistoryDB.__logger == None:
print("PriceHistoryDB: Initializing logger")
# Configure the logger
logger = logging.getLogger("PriceHistoryDB-Logger")
logger.setLevel(logging.ERROR)
# Create a file handler and set its level
handler_pricehistorydb_logger = logging.FileHandler('./logs/PriceHistoryDB.log')
handler_pricehistorydb_logger.setLevel(logging.ERROR)
# Create a formatter and add it to the handler
formatter_pricehistorydb_logger = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s')
handler_pricehistorydb_logger.setFormatter(formatter_pricehistorydb_logger)
# Add the handler to the logger
logger.addHandler(handler_pricehistorydb_logger)
PriceHistoryDB.__logger = logger
def __init__(self, uri = None, db = None, collection_name = None) -> None:
self.__initialize_logger()
# Connect to MongoDB
try:
if uri == None:
raise Exception("URI not provided")
if db == None:
raise Exception("Database name not provided")
if collection_name == None:
raise Exception("Collection name not provided")
client = MongoClient(uri)
db = client[db]
self.__collection = db[collection_name]
print(f"Connected to MongoDB DB: {db.name} Collection: {collection_name}")
except Exception as e:
PriceHistoryDB.__logger.error(f"Error connecting to MongoDB: {e}")
print(f"Error connecting to MongoDB: {e}")
return None
# Insert data from pandas dataframes
def Insert_Data(self, data = None, resoluion = "", schema = {}, workers = 5):
try:
if type(data) != pd.core.frame.DataFrame:
raise Exception("Invalid data provided")
if data.empty or len(data) == 0:
raise Exception("No data to insert")
if resoluion == "":
raise Exception("Resoluion not provided")
if type(schema) != dict or len(schema) == 0:
raise Exception("Schema not provided")
if workers < 1:
raise Exception("Invalid number of workers")
if workers > len(data):
workers = len(data)
# Validate each document against the schema before insertion
# Number of threads to use
num_threads = workers
# Split data into chunks for parallel processing
chunks = np.array_split(data, num_threads)
allDocumentsValid = True
def validateDocument(row, schema):
document = row.to_dict()
document['date'] =document['date'].__str__()
document['time'] =document['time'].__str__()
validate(document, schema)
with ProcessPoolExecutor(max_workers=num_threads) as executor:
try:
futures = [executor.submit((lambda c:
not c.empty and c.apply(lambda r: validateDocument(r, schema)
, axis=1))(chunk)) for chunk in chunks]
# Wait for any thread to finish and check for validation errors
for future in as_completed(futures):
if callable(future.result):
future.result()
except ValidationError as e:
PriceHistoryDB.__logger.error(f"Error validating document: {e}")
print(f"Error validating document: {e}")
allDocumentsValid = False
return -1
except Exception as e:
PriceHistoryDB.__logger.error(f"Error: {e}")
print(f"Error: {e}")
allDocumentsValid = False
return -1
try:
if allDocumentsValid != True:
raise Exception("Invaild document present")
print(f"Inserting documents: {len(data)}")
result = self.__collection.insert_many(data.to_dict(orient='records'))
print(f"Inserted {len(result.inserted_ids)} documents.")
# Save results to CSV file
try :
symbol = data.iloc[0]['symbol']
se = pd.Series(result.inserted_ids)
se.to_csv(f"./csv/PriceHistoryDB/{symbol}_{resoluion}_inserted_ids_PriceHistory.csv", index=False)
except Exception as e:
print(str(e))
PriceHistoryDB.__logger.error(str(e))
return len(result.inserted_ids)
except Exception as e:
PriceHistoryDB.__logger.error(f"Error inserting documents: {e}")
print(f"Error inserting documents: {e}")
return -1
except Exception as e:
PriceHistoryDB.__logger.error(f"Error: {e}")
print(f"Error: {e}")
return -1
I would like assistance in optimizing the validation process by leveraging multiprocessing. Specifically, I'm encountering issues with the apply function and would appreciate guidance on how to iterate over DataFrame rows efficiently in a multiprocessing environment. Any insights or alternative approaches to achieve this goal would be highly appreciated! The validation checks should execute by multiprocessing to use more available CPU; and exit all processing when encounter a validation error.