I work with a couple of pyspark UDFs which slow down my code, hence I want to transform some of them to PandasUDFs. One UDF takes an list of strings as argument (which comes from another column of the Spark DF). If I declare the python function as PandasUDF and parse the column with the list of strings, I get an KeyError 0. Printing the function argument shows that it changed from a single list to a series of lists (I guess this is expected because the Spark DF gets transformed to a PandasDf). However, as I need one array per row and not the full series, I am now wondering how I can assign each list from the series to the correct row. I tried it with some helper column "index", which I derive via:
messagesDf.withColumn(
"index",
rank().over(Window().partitionBy().orderby(messageId)
)
I later use the index to access single items from the series, but by doing so I get the warning that the whole dataframe is being moved to one partition (bad for performance). Is there a more sophisticated way to split the series (I am all new to Pandas)?
Edit: in order to help you visualize the problem: The original Python function looks like this:
def prepare_search_text(list_of_strings):
if len(list_of_strings) > 3:
...
list_of_strings looks i.e. like this:
['John Doe', 'Mainstreet 5', '55555', 'New York']
When I change the function to
@pandas_udf(ArrayType(StringType()))
def prepare_search_text(list_of_strings):
if len(list_of_strings) > 3:
...
argument list_of_strings is no longer a single list, but a whole series of lists:
['John Doe', 'Mainstreet 5', '55555', 'New York']
['Jane Doe', 'Whatever Street 34', '77777', 'Los Angeles']
['Joe Smith', 'Dummy Street 17', '88888', 'Chicago']
...
Therefore, the inner logic of my original function cant be applied anymore.