multiprocessing broken pipe after a long time

2.9k Views Asked by At

I develop a crawler using multiprocessing model.

which use multiprocessing.Queue to store url-infos which need to crawl , page contents which need to parse and something more;use multiprocessing.Event to control sub processes;use multiprocessing.Manager.dict to store hash of crawled url;each multiprocessing.Manager.dict instance use a multiprocessing.Lock to control access.

All the three type params are shared between all sub processes and parent process, and all the params are organized in a class, I use the instance of the class to transfer shared params from parent process to sub process. Just like: MGR = SyncManager() class Global_Params(): Queue_URL = multiprocessing.Queue() URL_RESULY = MGR.dict() URL_RESULY_Mutex = multiprocessing.Lock() STOP_EVENT = multiprocessing.Event() global_params = Global_Params()

In my own timeout mechanism, I use process.terminate to stop the process which can't stop by itself for a long time!

In my test case, there are 2500+ target sites(some are unservice, some are huge). crawl site by site that in the target sites file.

At the begining the crawler could work well, but after a long time( sometime 8 hours, sometime 2 hours, sometime moer then 15 hours), the crawler has crawled moer than 100( which is indeterminate) sites, I'll get error info:"Errno 32 broken pipe"

I have tried the following methods to location and solve the problems:

  1. location the site A which crawler broken on, then use crawler to crawls the site separately, the crawler worked well. Even I get a fragment(such as 20 sites) from all the target sites file which contain the site A, the crawler worked well!

  2. add "-X /tmp/pymp-* 240 /tmp" to /etc/cron.daily/tmpwatch

  3. when Broken occured the file /tmp/pymp-* is still there

  4. use multiprocessing.managers.SyncManager replace multiprocessing.Manager and ignore most signal except SIGKILL and SIGTERM

  5. for each target site, I clear most shared params(Queues,dicts and event),if error occured, create a new instance:

while global_params.Queue_url.qsize()>0: try: global_params.Queue_url.get(block=False) except Exception,e: print_info(str(e)) print_info("Clear Queue_url error!") time.sleep(1) global_params.Queue_url = Queue() pass the following is the Traceback info, the print_info function is defined to print and store debug info by myself: [Errno 32] Broken pipe Traceback (most recent call last): File "Spider.py", line 613, in <module> main(args) File "Spider.py", line 565, in main spider.start() File "Spider.py", line 367, in start print_info("STATIC_RESULT size:%d" % len(global_params.STATIC_RESULT)) File "<string>", line 2, in __len__ File "/usr/local/python2.7.3/lib/python2.7/multiprocessing/managers.py", line 769, in _callmethod kind, result = conn.recv() EOFError I can't understand why, does anyone knows the reason?

1

There are 1 best solutions below

5
On

I don't know if that is fixing your problem, but there is one point to mention:

global_params.Queue_url.get(block=False)

... throws an Queue.Empty expeption, if the Queue is empty. It's not worth to recreate the Queue for an empty exception.

The recreation of the queue can lead to race conditions.

From my point of view, you have to possibilities:

  1. get rid of the "queue recreation" code block
  2. switch to an other Queue implementation

use:

from Queue import Queue

instead of:

from multiprocessing import Queue