Pyspark error - Invalid argument, not a string or column while implementing inside pandas_udf

304 Views Asked by At

This code is working fine outside the pandas_udf but getting this error while trying to implement the same inside udf. To avoid conflicts between pyspark and python function names, I have explicitly imported specific functions from pyspark. Using fuzzywuzzy for string matching and nltk to divide the string into substrings based on ngram technique. This code was taking too long to run without udf so decided to go with pandas_udf but don't have any idea why getting this error.

import json
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType

from nltk.util import ngrams, everygrams
from fuzzywuzzy import process, fuzz

model_results_schema = StructType(
  [StructField("Output", StringType(), True)]
)

@pandas_udf( model_results_schema, PandasUDFType.GROUPED_MAP )
def get_ccep_results( model_input ):
  
  def ngram_filter(list1, list2):
    sorted_list1 = sorted(list1, key = lambda a: (a[1], len(a[0])), reverse =True)
    sorted_list2 = sorted(list2, key = lambda a: (a[1], len(a[0])), reverse =True)
    rslt1 =list(filter(lambda t: t[1]>58, sorted_list1))#Change the threshold based on requirement
    rslt2 =list(filter(lambda t: t[1]>58, sorted_list2))#Change the threshold based on requirement
    if len(rslt1)!=0:
        a = rslt1[0][0]
    else:
        a =''
    if len(rslt2)!=0:
        b = rslt2[0][0]
    else:
        b =''
    return a, b
  
  def ngram_fuzzy_match(n_gram, attribute):
    list_res=[]
    for i in n_gram:
        r = process.extract(i, attribute)
        list_res.append(r)
    flat_list = [x for xs in list_res for x in xs]   
    sorted_list = sorted(flat_list,key = lambda x: x[1], reverse=True )
    list_br =[]
    for j in attribute:
        p = process.extract(j, n_gram)
        list_br.append(p)
    flat_list1 = [x for xs in list_br for x in xs]     
    sorted_list1 = sorted(flat_list1,key =lambda x: x[1] , reverse=True )
    attribute_value, n_gram_value = ngram_filter(flat_list, flat_list1)
    return attribute_value, n_gram_value
  
  def ngrams_prod_desc(prod_desc, internal_att_list):
    prod_desc_temp = prod_desc
    temp_dict ={x:[] for x in range(0,len(internal_att_list))}
    for ind, x in enumerate(internal_att_list):
        
        list1 = list(everygrams(prod_desc_temp.split())) #Create n-grams for each description
        res = [' '.join(tups) for tups in list1]
        if len(res)!=0:
            r,ngram_candidate = ngram_fuzzy_match(res, x) #This functions does the fuzzy match between n-grams & attributes
            
            temp_dict[ind].append(r) # getting error here
            prod_desc_temp=prod_desc_temp.replace(ngram_candidate, '').strip()
        else:
            temp_dict[ind].append(None)
        
    
    return json.dumps(temp_dict)
  
  
  dictionary = model_input['cleaned_external'].apply(lambda x: ngrams_prod_desc(x, attribute_list))
  result = pd.DataFrame([dictionary])
  
  return result

model_output = (frame_combined.groupby(['prod_id']).apply(get_ccep_results)) # getting error here

frame_combined -

enter image description here

Update - Tried this one and got the exception -

try:
  model_output = (frame_combined.groupby(['prod_id']).apply(get_ccep_results))
except Exception as e:
  print(e)

enter image description here

0

There are 0 best solutions below