Pyspark "TypeError: Cannot convert pyarrow.lib.ChunkedArray to pyarrow.lib.Array"

418 Views Asked by At

On Databricks, I have a streaming pipeline where the bronze source and silver target are in delta format. I have a pandas udf that uses the requests_cache library to retrieve something from an url (Which can be quite compute expensive).

As long as there is no big backlog in the bronze table (Low numBytesOutstanding), everything goes well. There is a good performance (High processing rate, low batch_duration). Once there is a larger backlog (Due to increasing stream that writes from source to bronze), the performance becomes a lot worse.

At the moment, I keep getting the following error (With different sizes of cluster): TypeError: Cannot convert pyarrow.lib.ChunkedArray to pyarrow.lib.Array I didn't find a lot about this error, and nothing related to PySpark. Only this SO topic. There it is mentioned that it can be when the size exceeds 2GB, but it is already a few years old, and not Spark related.

I use a pandas_udf, so I assume it has something to do with this. Any one with some ideas?

Edit: Not sure if helpful, but here the pandas udf

import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T

@F.pandas_udf(T.ArrayType(T.ArrayType(T.StringType())))
def get_columns(urls: pd.Series) -> pd.Series:
   schemas = []
   session = requests_cache.CachedSession("mySession", backend="memory")
   schema_cache = {}
   for url in urls:
      if url in schema_cache:
         clean_schema = schema_cache(url)
      else:
         schema_raw = session.get(url).json()
         col_occurance = {}
         clean_schema = []
         for col_detail in schema_raw:
            if col_detail[0] in col_occurance:
               col_occurance[col_detail[0]] += 1
               clean_schema.append([
                          f"{col_detail[0]}_{col_occurance[col_detail[0]]}", 
                          col_detail[1]
                       ])
            else:
               col_occurance[col_detail[0]] = 1
               clean_schema.append([col_detail[0], col_detail[1]])
         schema_cache[url] = clean_schema
      schemas.append(clean_schema)
   return pd.Series(schemas)
2

There are 2 best solutions below

0
On

This looks like a bug with spark's pandas serialization, here is a simplified repro:

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
from pyspark.sql.functions import pandas_udf
import numpy as np
import pyarrow as pa
import pandas as pd

def large_pandas_udf(iterator):
    for pdf in iterator:
        big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in range(10)]
        strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
        arr = np.array(strings_list)
        yield pd.DataFrame(arr, columns=['string_column'])

def large_pandas_workaround_udf(iterator):
    for pdf in iterator:
        big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in range(10)]
        strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
        arr = np.array(strings_list)
        num_chunks = 20
        chunk_size = len(arr) // 20
        for i in range(num_chunks+1):
            yield pd.DataFrame(arr[i*chunk_size:(i+1)*chunk_size], columns=['string_column'])

def run_udf(spark, udf=large_pandas_udf):
    schema = T.StructType([T.StructField("string_column", T.StringType())])
    spark.conf.set('spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled', 'false') # get full error traceback
    df = spark.createDataFrame([['v']], schema)
    return df.mapInPandas(udf, schema).count()

Notice this also includes a workaround -> the idea is to manually chunk up the pandas dataframe small enough so that the pyarrow array doesn't return any "ChunkedArrays". See this pyarrow issue discussing this somewhat confusing behavior: https://github.com/apache/arrow/issues/34755

0
On

You could use the following configuration to control how many records per batch to process depending on your record size. default the value is 10k.

spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch",100)