Python multiprocessing job submission based on aggregate criteria for all jobs running

141 Views Asked by At

Python multiprocessing job submission based aggregate criteria for all jobs running

I have a job that needs to do some work on the Teradata Database and takes number of db sessions as an argument. The database has max limit of 60 on the number of db sessions. Can I use multiprocessing to conditionally process jobs so that the sum(num_db_sessions) in all active child process <= max_num_db_sessions?

I am just pasting some pseudo code below:

import multiprocessing as mp
import time

def dbworker(db_object, num_db_sessions):
    # do work on db_object #####
    # The sum(num_db_sessions) <= max_num_db_sessions 
    print (db_object, num_db_sessions)
    # The db_objs with larger num_db_sessions take longer to finish
    time.sleep(num_db_sessions)
    return

if __name__ == "__main__":
    max_num_db_sessions = 60
    # JobsList (db_object,num_db_sessions)
    jobs_list = [('A', 15), ('B', 15), ('C', 15), ('D', 15)
                , ('E', 1), ('F', 1), ('G', 1), ('H', 1)
                , ('I', 1), ('J', 1), ('K', 1), ('L', 1)
                , ('M', 2), ('N', 1), ('O', 1), ('P', 1)
                , ('Q', 2), ('R', 2), ('S', 2), ('T', 2)
                , ('U', 2), ('V', 2), ('W', 2), ('X', 2)
                , ('Y', 2), ('Z', 2)]
    ## Submit jobs_list to mutltiprocessing ####
    for db_object,num_db_sessions in jobs_list:
        dbworker(db_object,num_db_sessions) ## -->>> sum(num_db_sessions) <=  max_num_db_sessions
    ## Is this possible ??
1

There are 1 best solutions below

0
On

I have figured this out. The code below does this. The key elements are:

1) Run a seperate daemon process to put tasks in the queue. The target function for this does the orchestration

2) Implement a counter as a multiprocessing.value which keeps track of the current number of sessions running. The implementation of the counter has been taken from https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

3) Implement a multiprocessing.manager().list() to keep track of unsubmitted jobs.

4) Use of poison pill to break the worker process by sending None * number_of_child_processes as implemented in the poison pill approach. This has been taken from https://pymotw.com/3/multiprocessing/communication.html

The worker function uses time.sleep(num_db_sessions) as way to simulate the workload (higher processing times)

Here is the code.

import multiprocessing
import time
class Counter(object):
    def __init__(self, initval=0):
        self.val = multiprocessing.Value('i', initval)
        self.lock = multiprocessing.Lock()

    def increment(self,val):
        with self.lock:
            self.val.value += val

    def value(self):
        with self.lock:
            return self.val.value

def queue_manager(tasks,results,jobs_list,counter,max_num_db_sessions,num_consumers):
    proc_name = multiprocessing.current_process().name
    while len(jobs_list) > 0:
        current_counter = counter.value()
        available_sessions = max_num_db_sessions - current_counter
        if available_sessions > 0:
            prop_list = [(p,s) for p,s in jobs_list if s <= available_sessions]
            if (len(prop_list)) > 0:
                with multiprocessing.Lock():
                    print(prop_list[0])
                    tasks.put(prop_list[0][0])
                    jobs_list.remove(prop_list[0])

                counter.increment(prop_list[0][1])
                print("Process: {} -- submitted:{} Counter is:{} Sessions:{}".format(proc_name
                                                                          , prop_list[0][0]
                                                                          , current_counter
                                                                          , available_sessions)
                      )
        else:
            print("Process: {} -- Sleeping:{} Counter is:{} Sessions:{}".format(proc_name
                                                                                 , str(5)
                                                                                 , current_counter
                                                                                 , available_sessions)
            )
            time.sleep(5)
    else:
        for i in range(num_consumers):
            tasks.put(None)

def worker(tasks,counter,proc_list):
    proc_name = multiprocessing.current_process().name
    while True:
        obj = tasks.get()
        if obj is None:
            break
        name,age = [(name,sess) for name,sess in proc_list if name == obj][0]
        print("Process: {} -- Processing:{} Sleeping for:{} Counter is:{}".format(proc_name
                                                                              ,name
                                                                              ,age
                                                                              ,counter.value())
              )
        time.sleep(age)
        counter.increment(-age)
        print("Process: {} -- Exiting:{} Sleeping for:{} Counter is:{}".format(proc_name
                                                                              ,name
                                                                              ,age
                                                                              ,counter.value())
              )

if __name__ == '__main__':
    max_num_db_sessions = 60
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue() # This will be unused now. But will use it.
    mpmanager = multiprocessing.Manager()
    proc_list = [('A', 15), ('B', 15), ('C', 15), ('D', 15)
                , ('E', 1), ('F', 1), ('G', 1), ('H', 1)
                , ('I', 1), ('J', 1), ('K', 1), ('L', 1)
                , ('M', 2), ('N', 1), ('O', 1), ('P', 1)
                , ('Q', 2), ('R', 2), ('S', 2), ('T', 2)
                , ('U', 2), ('V', 2), ('W', 2), ('X', 2)
                , ('Y', 2), ('Z', 2)]
    jobs_list = mpmanager.list(proc_list)
    counter = Counter(0)
    num_cpu = 3
    d = multiprocessing.Process(name='Queue_manager_proc'
                                ,target=queue_manager
                                ,args=(tasks, results, jobs_list, counter
                                       , max_num_db_sessions, num_cpu)
                                )
    d.daemon = True
    d.start()
    jobs = []
    for i in range(num_cpu):
        p = multiprocessing.Process(name="Worker_proc_{}".format(str(i+1))
                                    ,target=worker
                                    ,args=(tasks,counter,proc_list)
                                    )
        jobs.append(p)
        p.start()

    for job in jobs:
        job.join()

    d.join()