Pyspark PandasUDF: One pd.Series element per Dataframe row

62 Views Asked by At

I work with a couple of pyspark UDFs which slow down my code, hence I want to transform some of them to PandasUDFs. One UDF takes an list of strings as argument (which comes from another column of the Spark DF). If I declare the python function as PandasUDF and parse the column with the list of strings, I get an KeyError 0. Printing the function argument shows that it changed from a single list to a series of lists (I guess this is expected because the Spark DF gets transformed to a PandasDf). However, as I need one array per row and not the full series, I am now wondering how I can assign each list from the series to the correct row. I tried it with some helper column "index", which I derive via:

messagesDf.withColumn(
   "index",
   rank().over(Window().partitionBy().orderby(messageId)
)

I later use the index to access single items from the series, but by doing so I get the warning that the whole dataframe is being moved to one partition (bad for performance). Is there a more sophisticated way to split the series (I am all new to Pandas)?

Edit: in order to help you visualize the problem: The original Python function looks like this:

def prepare_search_text(list_of_strings):
   if len(list_of_strings) > 3:
     ...

list_of_strings looks i.e. like this:

['John Doe', 'Mainstreet 5', '55555', 'New York']

When I change the function to

@pandas_udf(ArrayType(StringType()))    
def prepare_search_text(list_of_strings):
    if len(list_of_strings) > 3:
      ...

argument list_of_strings is no longer a single list, but a whole series of lists:

['John Doe', 'Mainstreet 5', '55555', 'New York']
['Jane Doe', 'Whatever Street 34', '77777', 'Los Angeles']
['Joe Smith', 'Dummy Street 17', '88888', 'Chicago']
...

Therefore, the inner logic of my original function cant be applied anymore.

0

There are 0 best solutions below