Python multiprocessing threads with return codes

40 Views Asked by At

I'm trying to run another script many times and have them run in parallel. The script returns a 1 for success and a 0 for failure. I need to get the sum of the return codes so I know how many succeeded.

import multiprocessing
import subprocess

class my_class():
  def value(self, state):
    """
    Returns the value of the given state
    """
    runs = 50
    self.wins = 0
    q = multiprocessing.Manager().Queue()
    pool = multiprocessing.Pool(multiprocessing.cpu_count() + 2)
    watcher = pool.apply_async(self.listener, (q,))

    jobs = []
    for i in range(runs):
      job = pool.apply_async(self.worker, (state, q))
      jobs.append(job)

    for job in jobs:
      job.get()

    q.put('kill')
    pool.close()
    pool.join()

    wins = self.wins
    print "Rate: {}/{} -> {}%".format(wins, runs, float(wins)/runs*100)
    sys.exit()
    return float(wins)/runs


  def worker(self, state, q):
    p = subprocess.Popen(["python", "script.py", state], stdout=subprocess.PIPE)
    out, err = p.communicate()
    q.put(p.returncode)
    return


  def listener(self, q):
    wins = 0
    while True:
      if not q.empty():
        m = q.get()
        if m == 'kill':
          break
        wins += m
    self.wins = wins
    return

The code returns:

cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

I've used similar code before to write to a file so I'm not sure what I'm doing wrong. Please help!

EDIT: (SOLUTION)

Thanks to the responses I learned what pickling was but that wasn't really my question. My question was how do I get the return codes from threads. This was easily accomplished by making the worker be a method that was not part of a class.

This is my solution:

import multiprocessing
import subprocess

class my_class():
  def value(self, state):
    """
    Returns the value of the given state
    """
    runs = 50
    q = multiprocessing.Manager().Queue()
    pool = multiprocessing.Pool(multiprocessing.cpu_count() + 2)

    jobs = []
    for i in range(runs):
      job = pool.apply_async(worker, (state, q))
      jobs.append(job)

    wins = 0
    for job in jobs:
      wins += job.get()

    pool.close()
    pool.join()

    print "Rate: {}/{} -> {}%".format(wins, runs, float(wins)/runs*100)
    sys.exit()
    return float(wins)/runs

def worker(state, q):
  p = subprocess.Popen(["python", "script.py", state], stdout=subprocess.PIPE)
  out, err = p.communicate()
  return p.returncode
0

There are 0 best solutions below