(py)Spark checkpointing consumes driver memory

279 Views Asked by At

Context

I have a pySpark-query that creates a rather large DAG. Thus, I break the lineage using checkpoint(eager=True) to shrink it which normally works. Note: I do not use localCheckpoint() since I use dynamic ressource allocation (see the docs for reference about this).

# --> Pseudo-code! <--

spark = SparkSession()
sc= SparkContext()

# Collect distributed data sources which results in touching a lot of files
# -> Large DAG
df1 = spark.sql("Select some data")
df2 = spark.sql("Select some other data")
df3 ...

# Bring these DataFrames together to break lineage and shorten DAG
# Note: In "eager"-mode this is executed right away
intermediate_results = df1.union(df2).union(df)....

sc.setCheckpointDir("hdfs:/...")
checkpointed_df = intermediate_results.checkpoint(eager=True)

# Now continue to do stuff
df_X = spark.sql("...")

result = checkpointed_df.join(df_X ...)

Problem

I start the Spark-session in client-mode (admin-requirement) in a Docker container in a Kubernetes cluster (respectively some third party product manages this as set up by the admins).

When I execute my code and intermediate_results.checkpoint(eager=True) two things happen:

  1. I receive a pySpark-error about loosing the connection to the JVMs and a resulting calling-error:

py4j.protocol.Py4JNetworkError: Answer from Java side is empty ... Py4JError: An error occurred while calling o1540.checkpoint

This is of course a very shortened StackTrace.

  1. The software controlling the Docker states:

Engine exhausted available memory, consider a larger engine size.

This refers to an exceeded memory-limit of the container.

Question

The only reason I can explain myself that the Docker-containers memory-limit is exceeded would be that checkpoint() actually passes data through the driver at some point. Otherwise, I have no action which would collect anything to the driver on purpose. However, I didn't read anything about it in the docs.

  • Does checkpoint() actually consume memory in the driver when executed?
  • Did anybody encounter a similar error-behaviour and can pin out that this is deriving from something else?
1

There are 1 best solutions below

0
protoncracker On

The checkpoint() operation in Spark writes the RDD or DataFrame to a reliable distributed (HDFS) storage and cuts off the lineage of transformations that led to it. This operation is designed to prevent the driver from becoming a bottleneck, as the checkpointed data does not need to be collected or passed through the driver. Instead, it is stored directly on the distributed file system. Being so, your statement doesn't quite make sense, unless:

  • Even though checkpoint() truncates the lineage of the RDD/DataFrame, the driver still needs to manage information about the checkpointed data and its location. This can consume some memory, but it's usually not significant compared to the size of the data itself. What size of data are you managing? You can check logs and files to see if it's the case. Some leaks may exist and be frizzling it.
  • If eager=True, the checkpoint operation immediately computes the RDD/DataFrame, which can indirectly cause increased memory usage if this computation triggers other actions that use the driver's memory.

Now about your errors:

  1. Ensure that your Spark configurations, such as spark.executor.memory, spark.driver.memory, spark.memory.fraction, etc., are set appropriaely for the workload and the environment. It must be of adequate size compared to the overall max values of your JVM.
  2. Large data sizes or inadequate partitioning can lead to memory issues, especially if partitions are too large for the memory allocated to each executor. Can you truncate the existing data before the checkpoint()?
  3. The error py4j.protocol.Py4JNetworkError could also indicate network issues between the Python process and the JVM, or bugs within the JVM. You may test that separately to ensure it's not the case, but probably it's just a subsequent error of the JVM failure.

Overall, you may test smaller data tests to see if the error relies on the code itself. If so, you know where to fix. If not, then you might need to optimize your code to use less resources or just get a better engine.