I have serious difficulties in understanding why I cannot run a transform which, after waiting so many minutes (sometimes hours), returns the error "Serialized Results too large".
In the transform I have a list of dates that I am iterating in a for loop to proceed with delta calculations within specific time intervals.
The expected dataset is the union of the iterated datasets and should contain 450k rows, not too many, but i have a lot of computing stages, tasks and attempts!!
The profile is already set a Medium profile and i can't scale on other profile and i can't set maxResultSize = 0.
Example of code:
Date_list = [All weeks from: '2021-01-01', to: '2022-01-01'] --> ~50 elements
df_total = spark.createDataframe([], schema)
df_date = []
for date in Date_list:
tmp = df.filter(between [date, date-7days]).withColumn('example', F.lit(date))
........
df2 = df.join(tmp, 'column', 'inner').......
df_date += [df2]
df_total = df_total.unionByName(union_many(*df_date))
return df_total
Don't pay attention to the syntax. This is just an example to show that there are a series of operations inside the loop. My desidered output is an dataframe which contains the dataframe of each iteration!
Thank you!!
Initial Theory
You are hitting a known limitation of Spark, similar to the findings discussed over here.
However, there are ways to work around this by re-thinking your implementation to instead be a series of dispatched instructions describing the batches of data you wish to operate on, similar to how you create your
tmpDataFrame.This may unfortunately require quite a bit more work to re-think your logic in this way since you'll want to imagine your manipulations purely as a series of column manipulation commands given to PySpark instead of row-by-row manipulations. There are some operations you cannot do purely using PySpark calls, so this isn't always possible. In general it's worth thinking through very carefully.
Concretely
As an example, your data range calculations are possible to perform purely in PySpark and will be substantially faster if you do this operations over many years or other increased scale. Instead of using Python list comprehension or other logic, we instead use column manipulations on a small set of initial data to build up our ranges.
I've written up some example code here on how you can create your date batches, this should let you perform a
jointo create yourtmpDataFrame, after which you can describe the types of operations you wish to do to it.Code to create date ranges (start and end dates of each week of the year):
Potential Optimizations
Once you have this code and if you are unable to express your work through joins / column derivations alone and are forced to perform the operation with the
union_many, you may consider using Spark's localCheckpoint feature on yourdf2result. This will allow Spark to simply calculate the resultant DataFrame and not add its query plan onto the result you will push to yourdf_total. This could be paired with the cache to also keep the resultant DataFrame in memory, but this will depend on your data scale.localCheckpointandcacheare useful to avoid re-computing the same DataFrames many times over and truncating the amount of query planning done on top of your intermediate DataFrames.You'll likely find
localCheckpointandcachecan be useful on yourdfDataFrame as well since it will be used many times over in your loop (assuming you are unable to re-work your logic to use SQL-based operations and instead are still forced to use the loop).As a quick and dirty summary of when to use each:
Use
localCheckpointon a DataFrame that was complex to compute and is going to be used in operations later. Oftentimes these are the nodes feeding intounionsUse
cacheon a DataFrame that is going to be used many times later. This often is a DataFrame sitting outside of a for/while loop that will be called in the loopAll Together
Your initial code
Should now look like:
Or, if you are unable to re-work your inner loop operations, you can do some optimizations like: