On Databricks, I have a streaming pipeline where the bronze source and silver target are in delta format. I have a pandas udf that uses the requests_cache library to retrieve something from an url (Which can be quite compute expensive).
As long as there is no big backlog in the bronze table (Low numBytesOutstanding
), everything goes well. There is a good performance (High processing rate, low batch_duration). Once there is a larger backlog (Due to increasing stream that writes from source to bronze), the performance becomes a lot worse.
At the moment, I keep getting the following error (With different sizes of cluster):
TypeError: Cannot convert pyarrow.lib.ChunkedArray to pyarrow.lib.Array
I didn't find a lot about this error, and nothing related to PySpark.
Only this SO topic. There it is mentioned that it can be when the size exceeds 2GB, but it is already a few years old, and not Spark related.
I use a pandas_udf, so I assume it has something to do with this. Any one with some ideas?
Edit: Not sure if helpful, but here the pandas udf
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
@F.pandas_udf(T.ArrayType(T.ArrayType(T.StringType())))
def get_columns(urls: pd.Series) -> pd.Series:
schemas = []
session = requests_cache.CachedSession("mySession", backend="memory")
schema_cache = {}
for url in urls:
if url in schema_cache:
clean_schema = schema_cache(url)
else:
schema_raw = session.get(url).json()
col_occurance = {}
clean_schema = []
for col_detail in schema_raw:
if col_detail[0] in col_occurance:
col_occurance[col_detail[0]] += 1
clean_schema.append([
f"{col_detail[0]}_{col_occurance[col_detail[0]]}",
col_detail[1]
])
else:
col_occurance[col_detail[0]] = 1
clean_schema.append([col_detail[0], col_detail[1]])
schema_cache[url] = clean_schema
schemas.append(clean_schema)
return pd.Series(schemas)
This looks like a bug with spark's pandas serialization, here is a simplified repro:
Notice this also includes a workaround -> the idea is to manually chunk up the pandas dataframe small enough so that the pyarrow array doesn't return any "ChunkedArrays". See this pyarrow issue discussing this somewhat confusing behavior: https://github.com/apache/arrow/issues/34755