Parquet.NET is generating huge parquet files in comparison with pyarrow

3.1k Views Asked by At

My application takes data from Azure EventHubs, which has a maximum of 1mb size, transforms it into a DataTable and then save it as a Parquet file somewhere.

The parquet generated by Parquet.Net is huge, it is always over 50mb even with the best compression method. When I read this 50mb parquet file using pandas and then re-write it into another file, it becomes less then 500kb.

See below the comparison between Parquet.Net (RED) and Pyarrow (BLUE):

As we can see, the number of columns and rows are the same. I did check the content and it seems all okay. Obs: There is one varchar(8000) column has lots of data.

That is how I got the parquet metadata:

import pandas as pd
import pyarrow.parquet as pq

# pd.set_option("max_colwidth", None)
# pd.set_option("max_seq_item", None)
# pd.set_option("min_rows", 2)
# pd.set_option("max_rows", None)
# pd.set_option('max_columns', None)

parquet_file_net = pq.ParquetFile("parquetnetFile.parquet")
print(parquet_file_net.metadata)

print()

parquet_file_py = pq.ParquetFile("pyarrowFile.parquet")
print(parquet_file_py.metadata)

print()
print()

print(parquet_file_net.metadata.row_group(0))
print()
print(parquet_file_py.metadata.row_group(0))

My c# code is based on the following one, but I did some changes: https://github.com/dazfuller/datatable-to-parquet

So here is my C# code.

    public static async Task<MemoryStream> ToParquetStream(DataTable dt)
            {
                var fields = GenerateSchema(dt);
                var parquetStream = new MemoryStream();
    
                using (var writer = new ParquetWriter(new Schema(fields), parquetStream))
                {
                    writer.CompressionMethod = CompressionMethod.Gzip;
                    writer.CompressionLevel = 2;
    
                    var range = Enumerable.Range(0, dt.Columns.Count);
                    var result = await range.ForEachAsyncInParallel(async c =>
                    {
                        return await Task.Run(() =>
                        {
                            // Determine the target data type for the column
                            var targetType = dt.Columns[c].DataType;
                            if (targetType == typeof(DateTime))
                            {
                                targetType = typeof(DateTimeOffset);
                            }
    
                            // Generate the value type, this is to ensure it can handle null values
                            var valueType = targetType.IsClass ? targetType : typeof(Nullable<>).MakeGenericType(targetType);
    
                            // Create a list to hold values of the required type for the column
                            var valuesArray = Array.CreateInstance(valueType, dt.Rows.Count);
    
                            // Get the data to be written to the parquet stream
                            for (int r = 0; r < dt.Rows.Count; r++)
                            {
                                DataRow row = dt.Rows[r];
    
                                // Check if value is null, if so then add a null value
                                if (row[c] == null || row[c] == DBNull.Value)
                                {
                                    valuesArray.SetValue(null, r);
                                }
                                else
                                {
                                    // Add the value to the list, but if it's a DateTime then create it as a DateTimeOffset first
                                    if (dt.Columns[c].DataType == typeof(DateTime))
                                    {
                                        valuesArray.SetValue(new DateTimeOffset((DateTime)row[c]), r);
                                    }
                                    else
                                    {
                                        valuesArray.SetValue(row[c], r);
                                    }
                                }
                            }
    
                            return valuesArray;
                        });
                    });
    
                    using (var rgw = writer.CreateRowGroup())
                    {
                        for (int c = 0; c < dt.Columns.Count; c++)
                        {                        
                            rgw.WriteColumn(new Parquet.Data.DataColumn(fields[c], result[c]));
                        }
                    }
                }
    
                return parquetStream;
            }
            private static List<DataField> GenerateSchema(DataTable dt)
            {
                var fields = new List<DataField>(dt.Columns.Count);
    
                foreach (DataColumn column in dt.Columns)
                {
                    // Attempt to parse the type of column to a parquet data type
                    var success = Enum.TryParse<DataType>(column.DataType.Name, true, out var type);
    
                    // If the parse was not successful and it's source is a DateTime then use a DateTimeOffset, otherwise default to a string
                    if (!success && column.DataType == typeof(DateTime))
                    {
                        type = DataType.DateTimeOffset;
                    }
                    // In c# float is System.Single. That is why the parse fails
                    else if (!success && column.DataType == typeof(float))
                    {
                        type = DataType.Float;
                    }
                    else if (!success)
                    {
                        type = DataType.String;
                    }
    
                    fields.Add(new DataField(column.ColumnName, type));
                }
    
                return fields;
            }


public static async Task<R[]> ForEachAsyncInParallel<T, R>(this IEnumerable<T> list, Func<T, Task<R>> func)
        {
            var tasks = new List<Task<R>>();
            foreach (var value in list)
            {
                tasks.Add(func(value));
            }
            return await Task.WhenAll<R>(tasks);
        }

So why is the file size so large?

Here are the files generated by parquet.net and pyarrow: https://easyupload.io/m/28jo48

0

There are 0 best solutions below