Using a shared queue that workers can add tasks to

1.3k Views Asked by At

I'm pretty new to python (I mainly write code in Java). I have a python script that's essentially a crawler. It calls phantomjs, which loads up the page, returns its source, and a list of urls that it found in the page.

I've been trying to use Python 3's multiprocessing module to do this, but I can't figure out how to use a shared queue that workers can also add to. I keep getting unpredictable results.

My previous approach used a global list of URLs, out of which I extracted a chunk and sent to workers using map_async. At the end, I would gather all the returned URLs and append them to the global list. The problem is that each "chunk" takes as long as the slowest worker. I'm trying to modify it so that whenever worker is done, it can pick up the next URL. However, I don't think I'm doing it correctly. Here's what I have so far:

def worker(url, urls):
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
    returned_urls = phantomjs(url)
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")

    for returned_url in returned_urls:
        urls.put(returned_url, block=True)

    print("There are " + str(urls.qsize()) + " URLs in total.\n")

if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    urls = manager.Queue()
    urls.append(<some-url>)

    pool = Pool()
    while True:
        url = urls.get(block=True)
        pool.apply_async(worker, (url, urls))

    pool.close()
    pool.join()

If there is a better way to do this, please let me know. I'm crawling a known site, and the eventual terminating condition is when there are no URLs to process. But right now it looks like I will just keep running for ever. I'm not sure if I would use queue.empty() because it does say that it's not reliable.

2

There are 2 best solutions below

4
On BEST ANSWER

Here is what I would probably do:

def worker(url, urls):
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
    returned_urls = phantomjs(url)
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")

      for returned_url in returned_urls:
          urls.put(returned_url, block=True)

      # signal finished processing this url
      urls.put('no-url')

    print("There are " + str(urls.qsize()) + " URLs in total.\n")

if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    pool = Pool()
    urls = manager.Queue()

    # start first url before entering loop
    counter = 1
    pool.apply_async(worker, (<some-url>, urls))

    while counter > 0:
        url = urls.get(block=True)
        if url == 'no-url':
            # a url has finished processing
            counter -= 1
        else:
            # a new url needs to be processed
            counter += 1
            pool.apply_async(worker, (url, urls))

    pool.close()
    pool.join()

Whenever a url is popped off the queue, increment the counter. Think of it as a "currently processing url" counter. When a 'no-url' is popped off the queue, a "currently processing url" has finished, so decrement the counter. As long as the counter is greater than 0, there are urls that haven't finished processing and returned 'no-url' yet.

EDIT

As I said in the comment (put here for anyone else who reads it), when using a multiprocessing.Pool, instead of thinking of it as individual processes, it's best to think of it as a single construct that executes your function each time it gets data (concurrently when possible). This is most useful for data-driven problems where you don't track or care about individual worker processes only the data being processed.

6
On

This is how I solved the problem. I originally went with the design posted in this answer but bj0 mentioned that it was abusing the initializer function. So I decided to do it using apply_async, in a fashion similar to the code posted in my question.

Since my workers modify the queue they are reading URLs from (they add to it), I thought that I could simply run my loop like so:

while not urls.empty():
   pool.apply_async(worker, (urls.get(), urls))

I expected that this would work since the workers will add to the queue, and apply_async would wait if all workers were busy. This didn't work as I expected and the loop terminated early. The problem was that it wasn't clear that apply_async does not block if all workers are busy. Instead, it will queue up submitted tasks, which means that urls will eventually become empty and the loop will terminate. The only time the loop blocks is if the queue is empty when you try to execute urls.get(). At this point, it will wait for more items to become available in the queue. But I still needed to figure out a way the terminate the loop. The condition is that the loop should terminate when none of the workers return new URLs. To do this, I use a shared dict that sets a value associated with the process-name to 0 if the process didn't return any URLs, and 1 otherwise. I check the sum of the keys during every iteration of the loop, and if it is ever 0 I know that I am done.

The basic structure ended up being like this:

def worker(url, url_queue, proc_user_urls_queue, proc_empty_urls_queue):

    returned_urls = phantomjs(url) # calls phantomjs and waits for output
    if len(returned_urls) > 0:
        proc_empty_urls_queue.put(
            [multiprocessing.current_process().name, 1]
        )
    else:
        proc_empty_urls_queue.put(
            [multiprocessing.current_process().name, 0]
        )

    for returned_url in returned_urls:
        url_queue.put(returned_url)

def empty_url_tallier(proc_empty_urls_queue, proc_empty_urls_dict):
    while 1:
        # This may not be necessary. I don't know if this worker is run
        # by the same process every time. If not, it is possible that
        # the worker was assigned the task of fetching URLs, and returned
        # some. So let's make sure that we set its entry to zero anyway.
        # If this worker is run by the same process every time, then this
        # stuff is not necessary.
        id = multiprocessing.current_process().name
        proc_empty_urls_dict[id] = 0

        proc_empty_urls = proc_empty_urls_queue.get()
        if proc_empty_urls == "done": # poison pill
            break

        proc_id = proc_empty_urls[0]
        proc_empty_url = proc_empty_urls[1]
        proc_empty_urls_dict[proc_id] = proc_empty_url

manager = Manager()

urls = manager.Queue()
proc_empty_urls_queue = manager.Queue()
proc_empty_urls_dict = manager.dict()

pool = Pool(33)

pool.apply_async(writer, (proc_user_urls_queue,))
pool.apply_async(empty_url_tallier, (proc_empty_urls_queue, proc_empty_urls_dict))

# Run the first apply synchronously 
urls.put("<some-url>")
pool.apply(worker, (urls.get(), urls, proc_empty_urls_queue))
while sum(proc_empty_urls_dict.values()) > 0:
    pool.apply_async(worker, (urls.get(), urls, proc_empty_urls_queue))

proc_empty_urls_queue.put("done") # poison pill
pool.close()
pool.join()