PySpark - Read checkpointed DataFrame

340 Views Asked by At

i am currently using pyspark to perform some data cleaning for a machine learning application. The last session crashed but i set up an checkpointdir and checkpointed my DataFrame.

Now i have checkpointed data directory in the form of:

id-of-checkpoint-dir/
\\- rdd-123/
\\- rdd-456/

The files in the rdd-subfolders seem to be hex files.

How can i read this checkpoint so i can cuntinue my data preparation instead of running the whole process again?

1

There are 1 best solutions below

0
luzhe On

I don't know how to read checkpointed Dataframe, but know how to read checkpointed RDD. And you can use the following code to convert DataFrame to RDD.

rdd = df.rdd()
rdd.checkpoint()

The code for reading checkpointed RDD is as follows:

from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
path = 'hdfs:///xxx/id-of-checkpoint-dir/rdd-123'
rdd = spark.sparkContext._checkpointFile(path, AutoBatchedSerializer(PickleSerializer()))

the definition of function _checkpointFile is as follows, of which the parameter input_deserializer maybe need keep the same with class RDD. https://github.com/apache/spark/blob/dd4db21cb69a9a9c3715360673a76e6f150303d4/python/pyspark/context.py#LL1674C8-L1674C8

def _checkpointFile(self, name: str, input_deserializer: PairDeserializer) -> RDD:
    jrdd = self._jsc.checkpointFile(name)
    return RDD(jrdd, self, input_deserializer)

for example, in spark 2.4.8, the deserializer is AutoBatchedSerializer(PickleSerializer()) https://spark.apache.org/docs/2.4.8/api/python/pyspark.html#pyspark.RDD

class pyspark.RDD(jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))[source]

while, in spark 3.4.0, the deserializer is AutoBatchedSerializer(CloudPickleSerializer()) https://spark.apache.org/docs/3.4.0/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

class pyspark.RDD(jrdd: JavaObject, ctx: SparkContext, jrdd_deserializer: pyspark.serializers.Serializer = AutoBatchedSerializer(CloudPickleSerializer()))[source]