I am working on a project where I need to take a list of ids and run said ids through an API pull that can only retrieve one record detail at a time per submitted id. Essentially what I have is a dataframe called df_ids that consist of over 12M ids that needs to go though the below function in order to obtain that information requested by the end user for the entire population:
def ELOQUA_CONTACT(id):
API = EloquaAPI(f'1.0/data/contact/{id}')
try:
contactid = API['id'].lower()
except:
contactid = ''
try:
company = API['accountName']
except:
company = ''
df = pd.DataFrame([contactid, company]).T.rename(columns={0:'contactid', 1:'company'})
return df
If I run something like ELOQUA_CONTACT(df_ids['Eloqua_Contact_IDs'][2]) it will give me the API record for the id = 2 in the form of a dataframe. The issue is, now I need to scale this to the entire 12M id population and build it in a way that it can be run and processed on a daily basis.
I have tried two techniques for parallel processing in DataBricks (python based; AWS backed). The first is based off a template that my manager developed for threading and when sampling it for just 1000 records it takes just shy of 2 minutes to query.
def DealEntries(df_input,n_sets):
n_rows = df_input.shape[0]
entry_per_set = n_rows // n_sets
extra = n_rows % n_sets
outlist = []
for i in range(n_sets):
if i != n_sets - 1:
idx = range(0+entry_per_set * i, entry_per_set * (i + 1))
else:
idx = range(0+entry_per_set * i, entry_per_set * (i + 1) + extra)
outlist.append(idx)
return outlist
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
data_input = pd.DataFrame(df_ids['Eloqua_Contact_IDs'][:1000])
rows_per_thread = 300
n_rows = data_input.shape[0]
threads = ceil(n_rows/rows_per_thread)
completed = 0
global df_results
outlist = DealEntries(data_input, threads)
df_results = []
for i in range(threads):
rng = [x for x in outlist[i]]
curr_input = data_input['Eloqua_Contact_IDs'][rng]
jobs = []
for id in curr_input.astype(str):
thread = ThreadWithReturnValue(target=ELOQUA_CONTACT, kwargs={'id' : id})
jobs.append(thread)
for j in jobs:
j.start()
for j in jobs:
df_results.append(j.join())
df_out = pd.concat(df_results)
df_out
The second method is something that I just put together and runs in about 20 seconds.
from multiprocessing.pool import ThreadPool
parallels = ThreadPool(1000)
df_results = parallels.map(ELOQUA_CONTACT, [i for i in df_ids['Eloqua_Contact_IDs'][:1000]])
df_out = pd.concat(df_results)
df_out
This issues with both of these is that when scalling the time per record up from 1k to 12M, the first method would take around 916 days to run and the second would take like 167 days to run. This needs to be scaled and parallel processed to a level that can run the 12M records in less then a day. Is there any other methologies or features associated with DataBricks/AWS/Python/Spark/etc that I can leverage to meet this objective? Once built, this would be put into a scheduled workflow(formally job) in DataBricks and run on its own spinup cluster that I can alter the backend resources with (CPU + RAM size).
Any insight or advice is very much welcomed. Thank you.