Python: pickle error with multiprocessing

1.6k Views Asked by At

As suggested below, I have changed my code to use Pool instead. I've also simplified my functions and included all my code below. However, now I'm getting a different error: NameError: global name 'split_files' is not defined

What I want to do is pass the actual file chunk into the parse_csv_chunk function but I'm not sure how to do that.

import csv
from itertools import islice
from collections import deque
import time
import math
import multiprocessing as mp
import os
import sys
import tempfile

csv_filename = 'test.csv'

def parse_csv_chunk(files_index):
    global split_files
    print files_index
    print len(split_files)
    return 1

def split(infilename, num_chunks):
    READ_BUFFER = 2**13
    in_file_size = os.path.getsize(infilename)
    print 'Original file size:', in_file_size
    chunk_size = in_file_size // num_chunks
    print 'Target chunk size:', chunk_size
    print 'Target number of chunks:', num_chunks
    files = []
    with open(infilename, 'rb', READ_BUFFER) as infile:
        infile.next()
        infile.next()
        infile.next()
        for _ in xrange(num_chunks):
            temp_file = tempfile.TemporaryFile()
            while temp_file.tell() < chunk_size:
                try:
                    #write 3 lines before checking if still < chunk_size
                    #this is done to improve performance
                    #the result is that each chunk will not be exactly the same size
                    temp_file.write(infile.next())
                    temp_file.write(infile.next())
                    temp_file.write(infile.next())
                #end of original file
                except StopIteration:
                    break
            #rewind each chunk
            temp_file.seek(0)
            files.append(temp_file)
    return files

if __name__ == '__main__':
    start = time.time()
    num_chunks = mp.cpu_count()
    split_files = split(csv_filename, num_chunks)
    print 'Number of files after splitting: ', len(split_files)

    pool = mp.Pool(processes = num_chunks)
    results = [pool.apply_async(parse_csv_chunk, args=(x,)) for x in range(num_chunks)]
    output = [p.get() for p in results]
    print output

I'm trying to split up a csv file into parts and have them processed by each of my CPU's cores. This is what I have so far:

import csv
from itertools import islice
from collections import deque
import time
import math
import multiprocessing as mp
import os
import sys
import tempfile

def parse_csv_chunk(infile):
    #code here
    return

def split(infilename, num_chunks):
    #code here
    return files

def get_header_indices(infilename):
    #code here
    return

if __name__ == '__main__':
    start = time.time() #start measuring performance
    num_chunks = mp.cpu_count() #record number of CPU cores
    files = split(csv_filename, num_chunks) #split csv file into a number equal of CPU cores and store as list
    print 'number of files after splitting: ', len(files)
    get_header_indices(csv_filename) #get headers of csv file
    print headers_list

    processes = [mp.Process(target=parse_csv_chunk, 
       args=ifile) for ifile in enumerate(files)] #create a list of processes for each file chunk

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    end = time.time()

    print "Execution time: %.2f" % (end - start) #display performance

There seems to be a problem at the line 'p.start()'. I see a lot of output on the console, which eventually indicates an error:

pickle.PicklingError: Can't pickle <built-in method write of file object at 0x02
22EAC8>: it's not found as __main__.write

I did not include the code for the functions I called as they are quite long, but I can if needed. I'm wondering if I'm using multiprocessing correctly.

1

There are 1 best solutions below

2
On

First off, if there a reason you are not using a Pool and the imap method of the Pool?

Second, it's very hard to tell any specifics without seeing your code, especially since the error points to parts of the code that are not provided.

However, it looks like you are using multiprocessing correctly from what you have provided -- and it's a serialization problem.

Note that if you use dill, you can serialize the write method.

>>> import dill
>>> 
>>> f = open('foo.bar', 'w') 
>>> dill.dumps(f.write)
'\x80\x02cdill.dill\n_get_attr\nq\x00cdill.dill\n_create_filehandle\nq\x01(U\x07foo.barq\x02U\x01wq\x03K\x00\x89c__builtin__\nopen\nq\x04\x89K\x00U\x00q\x05tq\x06Rq\x07U\x05writeq\x08\x86q\tRq\n.'

Most versions of multiprocessing use cPickle (or a version of pickle that is built in C), and while dill can inject it's types into the python version of pickle, it can't do so in the C equivalent.

There is a dill-activated fork of multiprocessing -- so you might try that, as if it's purely a pickling problem, then you should get past it with multiprocess.

See: https://github.com/uqfoundation/multiprocess.

EDIT (after OP update): The global declaration in your helper function isn't going to play well with pickle. Why not just use a payload function (like split) that reads a portion of the file and returns the contents, or writes to the target file? Don't return a list of files. I know they are TemporaryFile objects, but unless you use dill (and even then it's touchy) you can't pickle a file. If you absolutely have to, return the file name, not the file, and don't use a TemporaryFile. pickle will choke trying to pass the file. So, you should refactor your code, or as I suggested earlier, try to see if you can bypass serialization issues by using multiprocess (which uses dill).