This code is working fine outside the pandas_udf but getting this error while trying to implement the same inside udf. To avoid conflicts between pyspark and python function names, I have explicitly imported specific functions from pyspark. Using fuzzywuzzy for string matching and nltk to divide the string into substrings based on ngram technique. This code was taking too long to run without udf so decided to go with pandas_udf but don't have any idea why getting this error.
import json
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType
from nltk.util import ngrams, everygrams
from fuzzywuzzy import process, fuzz
model_results_schema = StructType(
[StructField("Output", StringType(), True)]
)
@pandas_udf( model_results_schema, PandasUDFType.GROUPED_MAP )
def get_ccep_results( model_input ):
def ngram_filter(list1, list2):
sorted_list1 = sorted(list1, key = lambda a: (a[1], len(a[0])), reverse =True)
sorted_list2 = sorted(list2, key = lambda a: (a[1], len(a[0])), reverse =True)
rslt1 =list(filter(lambda t: t[1]>58, sorted_list1))#Change the threshold based on requirement
rslt2 =list(filter(lambda t: t[1]>58, sorted_list2))#Change the threshold based on requirement
if len(rslt1)!=0:
a = rslt1[0][0]
else:
a =''
if len(rslt2)!=0:
b = rslt2[0][0]
else:
b =''
return a, b
def ngram_fuzzy_match(n_gram, attribute):
list_res=[]
for i in n_gram:
r = process.extract(i, attribute)
list_res.append(r)
flat_list = [x for xs in list_res for x in xs]
sorted_list = sorted(flat_list,key = lambda x: x[1], reverse=True )
list_br =[]
for j in attribute:
p = process.extract(j, n_gram)
list_br.append(p)
flat_list1 = [x for xs in list_br for x in xs]
sorted_list1 = sorted(flat_list1,key =lambda x: x[1] , reverse=True )
attribute_value, n_gram_value = ngram_filter(flat_list, flat_list1)
return attribute_value, n_gram_value
def ngrams_prod_desc(prod_desc, internal_att_list):
prod_desc_temp = prod_desc
temp_dict ={x:[] for x in range(0,len(internal_att_list))}
for ind, x in enumerate(internal_att_list):
list1 = list(everygrams(prod_desc_temp.split())) #Create n-grams for each description
res = [' '.join(tups) for tups in list1]
if len(res)!=0:
r,ngram_candidate = ngram_fuzzy_match(res, x) #This functions does the fuzzy match between n-grams & attributes
temp_dict[ind].append(r) # getting error here
prod_desc_temp=prod_desc_temp.replace(ngram_candidate, '').strip()
else:
temp_dict[ind].append(None)
return json.dumps(temp_dict)
dictionary = model_input['cleaned_external'].apply(lambda x: ngrams_prod_desc(x, attribute_list))
result = pd.DataFrame([dictionary])
return result
model_output = (frame_combined.groupby(['prod_id']).apply(get_ccep_results)) # getting error here
frame_combined -
Update - Tried this one and got the exception -
try:
model_output = (frame_combined.groupby(['prod_id']).apply(get_ccep_results))
except Exception as e:
print(e)