As suggested below, I have changed my code to use Pool instead. I've also simplified my functions and included all my code below. However, now I'm getting a different error: NameError: global name 'split_files' is not defined
What I want to do is pass the actual file chunk into the parse_csv_chunk function but I'm not sure how to do that.
import csv
from itertools import islice
from collections import deque
import time
import math
import multiprocessing as mp
import os
import sys
import tempfile
csv_filename = 'test.csv'
def parse_csv_chunk(files_index):
global split_files
print files_index
print len(split_files)
return 1
def split(infilename, num_chunks):
READ_BUFFER = 2**13
in_file_size = os.path.getsize(infilename)
print 'Original file size:', in_file_size
chunk_size = in_file_size // num_chunks
print 'Target chunk size:', chunk_size
print 'Target number of chunks:', num_chunks
files = []
with open(infilename, 'rb', READ_BUFFER) as infile:
infile.next()
infile.next()
infile.next()
for _ in xrange(num_chunks):
temp_file = tempfile.TemporaryFile()
while temp_file.tell() < chunk_size:
try:
#write 3 lines before checking if still < chunk_size
#this is done to improve performance
#the result is that each chunk will not be exactly the same size
temp_file.write(infile.next())
temp_file.write(infile.next())
temp_file.write(infile.next())
#end of original file
except StopIteration:
break
#rewind each chunk
temp_file.seek(0)
files.append(temp_file)
return files
if __name__ == '__main__':
start = time.time()
num_chunks = mp.cpu_count()
split_files = split(csv_filename, num_chunks)
print 'Number of files after splitting: ', len(split_files)
pool = mp.Pool(processes = num_chunks)
results = [pool.apply_async(parse_csv_chunk, args=(x,)) for x in range(num_chunks)]
output = [p.get() for p in results]
print output
I'm trying to split up a csv file into parts and have them processed by each of my CPU's cores. This is what I have so far:
import csv
from itertools import islice
from collections import deque
import time
import math
import multiprocessing as mp
import os
import sys
import tempfile
def parse_csv_chunk(infile):
#code here
return
def split(infilename, num_chunks):
#code here
return files
def get_header_indices(infilename):
#code here
return
if __name__ == '__main__':
start = time.time() #start measuring performance
num_chunks = mp.cpu_count() #record number of CPU cores
files = split(csv_filename, num_chunks) #split csv file into a number equal of CPU cores and store as list
print 'number of files after splitting: ', len(files)
get_header_indices(csv_filename) #get headers of csv file
print headers_list
processes = [mp.Process(target=parse_csv_chunk,
args=ifile) for ifile in enumerate(files)] #create a list of processes for each file chunk
for p in processes:
p.start()
for p in processes:
p.join()
end = time.time()
print "Execution time: %.2f" % (end - start) #display performance
There seems to be a problem at the line 'p.start()'. I see a lot of output on the console, which eventually indicates an error:
pickle.PicklingError: Can't pickle <built-in method write of file object at 0x02
22EAC8>: it's not found as __main__.write
I did not include the code for the functions I called as they are quite long, but I can if needed. I'm wondering if I'm using multiprocessing correctly.
First off, if there a reason you are not using a
Pool
and theimap
method of thePool
?Second, it's very hard to tell any specifics without seeing your code, especially since the error points to parts of the code that are not provided.
However, it looks like you are using
multiprocessing
correctly from what you have provided -- and it's a serialization problem.Note that if you use
dill
, you can serialize thewrite
method.Most versions of
multiprocessing
usecPickle
(or a version ofpickle
that is built in C), and whiledill
can inject it's types into the python version ofpickle
, it can't do so in the C equivalent.There is a
dill
-activated fork ofmultiprocessing
-- so you might try that, as if it's purely a pickling problem, then you should get past it withmultiprocess
.See: https://github.com/uqfoundation/multiprocess.
EDIT (after OP update): The
global
declaration in your helper function isn't going to play well withpickle
. Why not just use a payload function (likesplit
) that reads a portion of the file and returns the contents, or writes to the target file? Don't return a list of files. I know they areTemporaryFile
objects, but unless you usedill
(and even then it's touchy) you can't pickle a file. If you absolutely have to, return the file name, not the file, and don't use aTemporaryFile
.pickle
will choke trying to pass the file. So, you should refactor your code, or as I suggested earlier, try to see if you can bypass serialization issues by usingmultiprocess
(which usesdill
).