Specific Spark write operation gradually increase with time in streaming applicaiton

125 Views Asked by At

I have a long-running spark streaming job. The execution time gradually, linearly increasing, and in 60 minutes the processing goes from 40 seconds to 90 seconds.

This increase is happening at an HDFS write statement:

def write_checkpoint(self, df, event_name, no_partition_save=None, no_partition_read=None, partition_by=None, cache=True):

    hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate from the previous output
    if no_partition_save:
        # coalesce instead of repartition can have unwanted behaviour
        # https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
        df.repartition(no_partition_save) \
            .write \
            .mode("overwrite") \
            .save(hdfs_path)
    elif partition_by:
        df.write \
            .partitionBy(partition_by) \
            .mode("overwrite") \
            .save(hdfs_path)
    else:
        df \
            .write \
            .mode("overwrite") \
            .save(hdfs_path)
    
    if no_partition_read:
        df_new = self.spark.read.load(hdfs_path).repartition(no_partition_read)
    else:
        df_new = self.spark.read.load(hdfs_path)

    if partition_by:
        df_new = df.repartition(partition_by)
    if cache:
        df_new.cache()
    return df_new


When the application starts, this save operation takes 1-2 seconds.

As time goes on, the task itself remains 2 seconds (first picture, 1 completed stage, that took 2 seconds), but the whole query duration increases drastically (second picture, total time 40 seconds).

Task that remains 2 seconds

Job that increased to 40 seconds

I also put in some logging in python, where I can see the bottleneck at the same operation: enter image description here

What can be the reason for this?

0

There are 0 best solutions below