I was working on a web scraping project, but it was taking a lot of time processing the data, I came up with an alternate route to scrape the source code of products being scraped and then process data separately.
What I did is, stored the source code of each product enclosed separately within a tuple in an array and saved that array data in a text file for further processing at a later stage. I save data as chunks of 10,000 products. Each text file is about 10GB.
When I started to process data using multiprocessing I kept coming across the BrokenPipeError: [Error 32], Initially I was processing data on a windows machine, I explored a bit found that Linux is better at managing memory and this error is because of complete memory utilization during processing.
Initially, I was storing the processed data in an array (not saving the data at run time for each product), I read about at the stack forum that I need to save processed data, as the processed data was eating up all the memory, I changed the code accordingly changed map to imap, although it ran longer but still got the same error.
Here is my code, I am not posting the complete processing steps as it will only increase the length of code.
Point to note is there is huge amount of array data for each product when processed, each individual array up to 18000 elements.
I am using an octa-core processor with 16GB of ram and 500GB of ssd.
Any help would be appreciated. Thanks!
import xml.etree.cElementTree as ET
from lxml import html
import openpyxl
from openpyxl import Workbook
from lxml import etree
from lxml.etree import tostring
import pathos.multiprocessing as mp
import multiprocessing
import ast
global sourceDataList
sourceDataList=[]
global trackIndex
trackIndex=1
global failList
failList=[]
def processData(data):
vehicalData=[]
oemData=[]
appendIndex=0
#geting product link form incoming data list (tupile)
p=data[0][1]
#geting html source code form incoming data list(tupile)
#converting it to html element
source_code=html.fromstring(data[0][0])
#processing data
try:
firstOem=source_code.xpath("//div[@id='tab-review']//tr[2]/td[2]")
firstOem=firstOem[0].text_content().strip()
except:
firstOem=''
try:
name=source_code.xpath("//div[@id='right_title']/h1")
name=name[0].text_content().strip()
except:
name=''
#saving data in respective arrays
vehicalData.append([firstOem,p,name,productType,brand,mfgNumber,imgOne,imgTwo,imgThree,imgFour,imgFive])
for q in dayQtyPrice:
vehicalData[appendIndex].append(q)
vehicalData[appendIndex].append(specString)
vehicalData[appendIndex].append(subAssembltString)
vehicalData[appendIndex].append(parentAssemblyString)
vehicalData[appendIndex].append(otherProductString)
vehicalData[appendIndex].append(description)
vehicalData[appendIndex].append(placement)
for dma in makeModelArray:
vehicalData[appendIndex].append(dma)
oemData.append([firstOem,name,productType,brand,mfgNumber,p])
for o in oemArray:
oemData[appendIndex].append(o)
print('Done !',p,len(vehicalData[0]),len(oemData[0]))
#returning both arrays
return (vehicalData,oemData)
def main():
productLinks=[]
vehicalData=[]
oemData=[]
#opening text file for processing list data
with open('test.txt', encoding='utf-8') as f:
string=f.read()
sourceDataList=ast.literal_eval(string)
print('Number of products:',len(sourceDataList))
#creating pool and initiating multiprocessing
p = mp.Pool(4) # Pool tells how many at a time
#opening and saving data at run time
vehicalOutBook=openpyxl.load_workbook('vehical_data_file.xlsx')
vehicalOutSheet=vehicalOutBook.active
oemOutBook=openpyxl.load_workbook('oem_data_file.xlsx')
oemOutSheet=oemOutBook.active
for d in p.imap(processData, sourceDataList):
v=d[0][0][:18000]
o=d[1][0][:18000]
vehicalOutSheet.append(v)
oemOutSheet.append(o)
p.terminate()
p.join()
#saving data
vehicalOutBook.save('vehical_data_file.xlsx')
oemOutBook.save('oem_data_file.xlsx')
if __name__=='__main__':
main()
I am not familiar with the
pathos.multiprocessing.Pool
class, but let's assume it works more or less the same as themultiprocess.pool.Pool
class. The problem is that the data intest.txt
is in such a format that it appears that you must read the whole file in to parse it withast.liter_eval
and therefore there can be no storage savings withimap
.To use
imap
(orimap_unordered
) efficiently, instead of storing in filetest.txt
a representation (JSON
?) of alist
, store multiple product representations separated by a newline that can be individually parsed so that the file can be read and parsed line by line instead to yield individual products. You should have an approximate count of how many lines and thus how many tasks will need to be submitted toimap
. The reason for this is that when you have a large number of tasks, it will be more efficient to use something other than the default chunksize argument value of 1. I have included below a function to compute a chunksize value along the lines that themap
function would use. Also, it seems that your worker functionprocessData
is using one level of nested lists more than necessary. I have also reverted to using the standardmultiprocessing.pool.Pool
class since I know more or less how that works.Note: I don't see where in
processData
variablesmakeModelArray
andoemArray
are defined.You will still require a lot of storage for your final spreadsheet! Now if you were outputting two
csv
files, that would be a different story -- you could be writing those as you went along.