What is the best way to parallel process in DataBricks to minimize query time?

333 Views Asked by At

I am working on a project where I need to take a list of ids and run said ids through an API pull that can only retrieve one record detail at a time per submitted id. Essentially what I have is a dataframe called df_ids that consist of over 12M ids that needs to go though the below function in order to obtain that information requested by the end user for the entire population:

def ELOQUA_CONTACT(id):
  API = EloquaAPI(f'1.0/data/contact/{id}')
  try:
    contactid = API['id'].lower()
  except:
    contactid = ''
  try:
    company = API['accountName']
  except:
    company = ''

  df = pd.DataFrame([contactid, company]).T.rename(columns={0:'contactid', 1:'company'})
  return df

If I run something like ELOQUA_CONTACT(df_ids['Eloqua_Contact_IDs'][2]) it will give me the API record for the id = 2 in the form of a dataframe. The issue is, now I need to scale this to the entire 12M id population and build it in a way that it can be run and processed on a daily basis.

I have tried two techniques for parallel processing in DataBricks (python based; AWS backed). The first is based off a template that my manager developed for threading and when sampling it for just 1000 records it takes just shy of 2 minutes to query.

def DealEntries(df_input,n_sets):
    n_rows = df_input.shape[0]
    entry_per_set = n_rows // n_sets
    extra = n_rows % n_sets
    
    outlist = []
    for i in range(n_sets):
        if i != n_sets - 1:
            idx = range(0+entry_per_set * i, entry_per_set * (i + 1))
        else:
            idx = range(0+entry_per_set * i, entry_per_set * (i + 1) + extra)
        outlist.append(idx)
    return outlist

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return
      
data_input = pd.DataFrame(df_ids['Eloqua_Contact_IDs'][:1000])
rows_per_thread = 300
n_rows = data_input.shape[0]
threads = ceil(n_rows/rows_per_thread)

completed = 0
global df_results

outlist = DealEntries(data_input, threads)

df_results = []
for i in range(threads):
    rng = [x for x in outlist[i]]
    curr_input = data_input['Eloqua_Contact_IDs'][rng]
    jobs = []
    for id in curr_input.astype(str):
        thread = ThreadWithReturnValue(target=ELOQUA_CONTACT, kwargs={'id' : id})
        jobs.append(thread)

    for j in jobs:
        j.start()

    for j in jobs:
        df_results.append(j.join())
df_out = pd.concat(df_results)
df_out

The second method is something that I just put together and runs in about 20 seconds.

from multiprocessing.pool import ThreadPool
parallels = ThreadPool(1000)
df_results = parallels.map(ELOQUA_CONTACT, [i for i in df_ids['Eloqua_Contact_IDs'][:1000]])
df_out = pd.concat(df_results)
df_out

This issues with both of these is that when scalling the time per record up from 1k to 12M, the first method would take around 916 days to run and the second would take like 167 days to run. This needs to be scaled and parallel processed to a level that can run the 12M records in less then a day. Is there any other methologies or features associated with DataBricks/AWS/Python/Spark/etc that I can leverage to meet this objective? Once built, this would be put into a scheduled workflow(formally job) in DataBricks and run on its own spinup cluster that I can alter the backend resources with (CPU + RAM size).

Any insight or advice is very much welcomed. Thank you.

0

There are 0 best solutions below