Is there a way to use Parallel processing to read chunks from a file and join together the string in order?

6.3k Views Asked by At

I see many examples on how to add numbers using Parallel, however I have not found anything that could demonstrate reading in multiple chunks (say 512 bytes per chunk) in parallel from a stream, and have the results joined together.

I would like to know if it is possible to read multiple parts of a stream and have them concatenated together in proper order.

For example

Assume the following text file

Bird
Cats
Dogs

And reading in a chunk size of 5 bytes, from a normal stream would be something like :

byte[] buffer = new byte[5];
int bytesRead = 0;
StringBuilder sb = new StringBuilder();
using (Stream stream = new FileStream( "animals.txt", FileMode.Open, FileAccess.Read )) {
    while ( (bytesRead = stream.Read( buffer, 0, buffer.Length )) > 0 ) {
        sb.Append( Encoding.UTF8.GetString( buffer ) );
    }
}

Would read in each line (all lines are 5 bytes) and join them together in order so the resulting string would be identical to the file.

However, consider using something like this solution seems like it would potentially join them out of order. I also don't know how it would apply in the above context to replace the where loop.

How can I read in those chunks simultaneously and have them append to StringBuilder the bytes from each iteration - not the order the iteration occurs, but the order which is proper so I don't end up with something like

Cats
Bird
Dog

Sorry I don't have any parallel code to show as this is the reason for the post. It seems easy if you want to sum up numbers, but to have it work in the manner that it is as follows :

  • Reading from a stream in byte chunks (say 512 bytes per chunk)
  • Appending to a master result in the order which they are in the stream, not necessarily the order processed.

... seems to be a daunting challenge

1

There are 1 best solutions below

11
On BEST ANSWER

By their nature, streams are not compatible with parallel processing. The abstraction of a stream is sequential access.

You can read the stream content sequentially into an array, then launch parallel processing on it, which has the desired effect (processing is parallelized). You can even spawn the parallel tasks as chunks of the stream arrive.

var tasks = new List<Task>();
do {
   var buffer = new byte[blockSize];
   var location = stream.Position;
   stream.Read(buffer);
   tasks.Add(ProcessAsync(buffer, location)); 
} while (!end of stream);
await Task.WhenAll(tasks.ToArray());

Or, if you have random access, you can spawn parallel tasks each with instructions to read from a particular portion of the input, process it, and store to the corresponding part of the result. But note that although random access to files is possible, the access still has to go through a single disk controller... and that hard disks are not random access even though they expose a random-access interface, non-sequential read patterns will result in a lot of time wasted seeking, lowering efficiency far below what you get from streaming. (SSDs don't seek so there's not much penalty for random request patterns, but you don't benefit either)


Thanks to @Kraang for collaborating on the following example, matching the case of parallel processing binary data.

If reading from bytes alone, you could use parallel processing to handle the chunks as follows :

// the byte array goes here
byte[] data = new byte[N];

// the block size
int blockSize = 5;

// find how many chunks there are
int blockCount = 1 + (data.Length - 1) / blockSize;

byte[][] processedChunks = new byte[blockCount][];
Parallel.For( 0, blockCount, ( i ) => {
    var offset = i * blockSize;

    // set the buffer size to block size or remaining bytes whichever is smaller
    var buffer = new byte[Math.Min( blockSize, data.Length - offset )];

    // copy the bytes from data to the buffer
    Buffer.BlockCopy( data, i * blockSize, buffer, 0, buffer.Length );

    // store buffer results into array in position `i` preserving order
    processedChunks[i] = Process(buffer);
} );

// recombine chunks using e.g. LINQ SelectMany