I'm pretty new to python (I mainly write code in Java). I have a python script that's essentially a crawler. It calls phantomjs, which loads up the page, returns its source, and a list of urls that it found in the page.
I've been trying to use Python 3's multiprocessing
module to do this, but I can't figure out how to use a shared queue that workers can also add to. I keep getting unpredictable results.
My previous approach used a global list of URLs, out of which I extracted a chunk and sent to workers using map_async
. At the end, I would gather all the returned URLs and append them to the global list. The problem is that each "chunk" takes as long as the slowest worker. I'm trying to modify it so that whenever worker is done, it can pick up the next URL. However, I don't think I'm doing it correctly. Here's what I have so far:
def worker(url, urls):
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
returned_urls = phantomjs(url)
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")
for returned_url in returned_urls:
urls.put(returned_url, block=True)
print("There are " + str(urls.qsize()) + " URLs in total.\n")
if __name__ == '__main__':
manager = multiprocessing.Manager()
urls = manager.Queue()
urls.append(<some-url>)
pool = Pool()
while True:
url = urls.get(block=True)
pool.apply_async(worker, (url, urls))
pool.close()
pool.join()
If there is a better way to do this, please let me know. I'm crawling a known site, and the eventual terminating condition is when there are no URLs to process. But right now it looks like I will just keep running for ever. I'm not sure if I would use queue.empty()
because it does say that it's not reliable.
Here is what I would probably do:
Whenever a url is popped off the queue, increment the counter. Think of it as a "currently processing url" counter. When a 'no-url' is popped off the queue, a "currently processing url" has finished, so decrement the counter. As long as the counter is greater than 0, there are urls that haven't finished processing and returned 'no-url' yet.
EDIT
As I said in the comment (put here for anyone else who reads it), when using a
multiprocessing.Pool
, instead of thinking of it as individual processes, it's best to think of it as a single construct that executes your function each time it gets data (concurrently when possible). This is most useful for data-driven problems where you don't track or care about individual worker processes only the data being processed.