There is the following trick how to trim Apache Spark dataframe lineage, especially for iterative computations:
def getCachedDataFrame(df: DataFrame): DataFrame = {
val rdd = df.rdd.cache()
df.sqlContext.createDataFrame(rdd, df.schema)
}
It looks like some sort of pure magic, but right now I'm wondering why do we need to invoke cache()
method on RDD? What is the purpose of having cache in this lineage trimming logic?
To understand the purpose of caching, it helps to understand the different types of RDD operations: transformations and actions. From the docs:
Also consider this bit:
So Spark's transformations (like
map
for example) are all lazy, because this helps Spark in being smart about which calculations need to be done while creating a query plan.What does this have to do with caching?
Consider the following code:
In here, we reading in some file, applying some transformations and applying 2 actions to the same dataframe:
cleansedDF.write.parquet
andcleansedDF.count
.As the comments in the code explain, if we run the code like this we will be actually computing those transformations twice. Since the transformations are lazy, they will only get executed if an action requires them to be executed.
How can we prevent this double calculation? With caching: we can tell Spark to keep "save" result of some transformations so that they don't have to be calculated multiple times. This can be either on disk/memory/....
So with this knowledge, our code could be something like this:
I've adjusted the comments in this code block to highlight the difference with the previous block.
Note that
.persist
also exists. With.cache
you use the default storage level, with.persist
you can specify which storage level as this SO answer nicely explains.Hope this helps!