Watson Studio "Spark Environment" - how to increase `spark.driver.maxResultSize`?

277 Views Asked by At

I'm running a spark job where I'm reading, manipulating and merging a lot of txt files into a single file, but I'm hitting this issue:

Py4JJavaError: An error occurred while calling o8483.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 838 tasks (1025.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

Is it possible to increase the size of spark.driver.maxResultSize?

Note: this question is about the WS Spark “Environments” NOT about Analytics Engine.

1

There are 1 best solutions below

0
On

You can increase the default value through the Ambari console if you are using "Analytics Engine" spark cluster instance. You can get the link and credentials to the Ambari console from IAE instance in console.bluemix.net. From Ambari console, add a new property in

Spark2 -> "Custom spark2-defaults" -> Add property -> spark.driver.maxResultSize = 2GB

Make sure the spark.driver.maxResultSize values is less than driver memory which is set in

Spark2 -> "Advanced spark2-env" -> content -> SPARK_DRIVER_MEMORY

Another suggestion if you are just trying to create a single CSV file and don't want to change spark conf values since u don't know how large the final file would be, is to use a function like below which uses hdfs getmerge function to create a single csv file just like pandas.

def writeSparkDFAsCSV_HDFS(spark_df, file_location,file_name, csv_sep=',', csv_quote='"'):
    """
    It can be used to write large spark dataframe as a csv file without running 
    into memory issues while converting to pandas dataframe.
    It first writes the spark df to a temp hdfs location and uses getmerge to create 
    a single file. After adding a header, the merged file is moved to hdfs.

    Args:
        spark_df (spark dataframe) : Data object to be written to file.
        file_location (String) : Directory location of the file.
        file_name (String) : Name of file to write to.
        csv_sep (character) : Field separator to use in csv file
        csv_quote (character) : Quote character to use in csv file
    """
    # define temp and final paths
    file_path= os.path.join(file_location,file_name)
    temp_file_location = tempfile.NamedTemporaryFile().name 
    temp_file_path = os.path.join(temp_file_location,file_name)

    print("Create directories")
    #create directories if not exist in both local and hdfs
    !mkdir $temp_file_location
    !hdfs dfs -mkdir $file_location
    !hdfs dfs -mkdir $temp_file_location

    # write to temp hdfs location
    print("Write to temp hdfs location : {}".format("hdfs://" + temp_file_path))
    spark_df.write.csv("hdfs://" + temp_file_path, sep=csv_sep, quote=csv_quote)


    # merge file from hadoop to local
    print("Merge and put file at {}".format(temp_file_path))
    !hdfs dfs -getmerge $temp_file_path $temp_file_path

    # Add header to the merged file
    header = ",".join(spark_df.columns)
    !rm $temp_file_location/.*crc
    line_prepender(temp_file_path, header)

    #move the final file to hdfs
    !hdfs dfs -put -f $temp_file_path $file_path

    #cleanup temp locations
    print("Cleanup..")
    !rm -rf $temp_file_location
    !hdfs dfs -rm -r $temp_file_location
    print("Done!")