Azure Spark Notebook Processing Large Text and/or Binary files

633 Views Asked by At

Reference Azure Synapse Pipeline running Spark Notebook Generates Random Errors for more information on this. I have been fighting getting an Azure Synapse Spark Notebook to process an uncompressed 778MB IIS file. The previous link shows some of the issues I figured out. However, this one file took me 3 days to finally get to work. The big issue was that I was trying to process the file as a binary format var df = spark.Read().Format("binaryFile").Option("inferSchema", false).Load(sourceFile) ; rather than a text format var df = spark.Read().Text(sourceFile) ; . Seems like Spark has a problem reading the entire file into memory.

You may well ask why I am not using some other method of processing this file. Well the files are uploaded to a Gen 2 Storage account compressed using Deflate. I tried processing these files using the copy activity without success and I could decompress the files using a Spark NoteBook and convert them to Parquet but I could not get this one large file to cooperate.

So initially I tried to just process the already uploaded uncompressed files and convert them to Parquet using a Spark Notebook. And of course Mr 778MB would not cooperate. But I beat that file into submission by processing it as text. That being done it was time to figure out the process to handle uploaded compressed files.
So here is the situation:

  • There are three servers with large IIS files that are uploaded in a compressed manner stored in Gen 2 as yyyy/mm/dd/server1.cmp, yyyy/mm/dd/server2.cmp, yyyy/mm/dd/server3.cmp.
  • These need to be moved from a transient container to a rawdata container.
  • In the process the files need to be uncompressed and converted to a Parquet format and placed in folder yyyy/mm/dd.
1

There are 1 best solutions below

0
On
  1. So I wrote an Azure Function that returns all of the files in Gen 2 storage for any folder that has the yyyy/mm/dd format.
  2. Wrote a Pipeline that consists of the azure function, a ForEach activity, and a Spark Notebook.
  3. The Spark Notebook takes in the three files and process them sequentially The first file is output as SaveMode.Overwrite and the remaining are output as SaveMode.Append.
  4. Reading the compressed files as binary, process the Byte[] from the DataFrame using a DeflateStream wrapped with PipeReader

See the code below:

//read the binary compressed data
byte[] rawData = df.First().GetAs<byte[]>("content");
//pass it off to ReadPipeAsync

List<GenericRow> rows = new List<GenericRow>();
    df.Unpersist() ;

    using (MemoryStream stream = new MemoryStream(rawData))
    {
        using (DeflateStream deflateStream = new DeflateStream(stream, CompressionMode.Decompress, true))
        {
            Task readTask = Task.Run(async () => await ReadPipeAsync(PipeReader.Create(deflateStream), rows));
            readTask.Wait();
        }
    }


ReadPipeAsync is copied from the Microsoft documentation and allows me to get CRLF delimited data and ultimately put the data into a collection of List<GenericRow> rows = new List<GenericRow>();

I had to modify the configuration within the notebook as follows:

%%configure -f
{
    "conf":
    {
        "spark.kryoserializer.buffer.max" : "512",
        "spark.rpc.message.maxSize" : "256"
    }
}

Hope something here saves you days of research.