I'm trying to write a simple producer-consumer app where I need to read data in chunks from a file (which may be huge) and (for simple testing purposes) just write them to another file through another thread.
I've tried to follow lots of online sources but these thread synchronization tasks are difficult for me to understand and each of the examples I have found missed some important aspects for me.
I have put together pieces of code which ALMOST seem to work but there is something threads-related which is obviously wrong so I wanted to ask you for help if anyone can spot what I'm doing wrong.
If I run the program below for some test file the program finishes OK (at least for me and my test file) but if I uncomment Thread.Sleep(20) in the dequeueObjectAndWriteItToFile method (in order to test what happens when the producer is faster then the consumer) then (based on the data printed in the console) the producer inserts maxQueueSize+1 data blocks in the queue and the program gets into some infinite loop or something.
I suspect _producerThreadWaitEventHandler.Set() call might be a part of a problem because at the moment it is called in dequeueObjectAndWriteItToFile for every single while loop (I'd like to call it only if necessary, i.e. if the _producerThreadWaitEventHandler.waitOne() has been called and I should wake that thread, but I don't know how to find out whether for a particular thread waitOne has been called or not in order to wake the thread).
There may be other synchronization problems, of course, but as I'm new to multithreading I don't know where to look first and what would be the best solution.
Note, I want to use (and understand) basic techniques (such as Monitor or AutoResetEvent) for synchronization (instead of BlockingQueue, TPL etc.) so I hope some minor tweaks to the code below would make it work.
I'll be grateful for any hint.
Thanks.
using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;
class ProducerConsumerApp : IDisposable
{
public static string originalFilePath = @"D:\test.txt";
public static string outputFilePath = @"D:\test_export.txt";
public static int blockSize = 15;
int maxQueueSize = 4; // max allowed number of objects in the queue
EventWaitHandle _consumerThreadWaitEventHandler = new AutoResetEvent(false);
EventWaitHandle _producerThreadWaitEventHandler = new AutoResetEvent(false);
Thread _consumerThread;
readonly object _lock = new object();
Queue<byte[]> _queue = new Queue<byte[]>();
public ProducerConsumerApp(Stream outputStream)
{
_consumerThread = new Thread(dequeueObjectAndWriteItToFile);
_consumerThread.Start(outputStream);
}
public void enqueueObject(byte[] data)
{
lock (_lock)
{
// TODO !!!
// Make sure producent doesn't enqueue more objects than the maxQueueSize is,
// i.e. let the producent wait until consumer dequeues some object from the full queue
if (_queue.Count > maxQueueSize) // would "while" be better? Doesn't seem to change anything
{
_producerThreadWaitEventHandler.WaitOne();
}
// Thread.Sleep(20); // just for testing
_queue.Enqueue(data);
// data being read in case of a text file:
//string str = (data==null) ? "<null>" : System.Text.Encoding.Default.GetString(data);
//Console.WriteLine("Enqueuing data: "+str);
}
_consumerThreadWaitEventHandler.Set(); // data enqueued => wake the consumerThread
}
public void Dispose() // called automatically (IDisposable implementer) when instance is being destroyed
{
enqueueObject(null); // Signal the consumer to exit.
_consumerThread.Join(); // Wait for the consumer's thread to finish.
_consumerThreadWaitEventHandler.Close(); // Release any OS resources.
}
void dequeueObjectAndWriteItToFile(object outputStream)
{
while (true)
{
// Thread.Sleep(20); // slow down the consumerThread to check what happens when the producer fully fills the queue
// PROBLEM - the app gets into some infinite loop if I do this!!! What exactly is wrong?
byte[] data = null;
lock (_lock)
if (_queue.Count > 0) // queue not empty
{
data = _queue.Dequeue();
_producerThreadWaitEventHandler.Set();
// !!! This doesn't seem right - I don't want to call this in each while iteration
// I would like to call it only if _producerThreadWaitEventHandler.WaitOne has been called
// but how to check such a condition?
if (data == null)
{
// Console.WriteLine("Data file reading finished => let consumerThread finish and then quit the app");
return;
}
}
if (data != null)
{
((FileStream)outputStream).Write(data, 0, data.Length); // write data from the queue to a file
// just a test in case of a text file:
// string str = System.Text.Encoding.Default.GetString(data);
// Console.WriteLine("Data block retrieved from the queue and written to a file: " + str);
} else { // empty queue => let the consumerThread wait
_consumerThreadWaitEventHandler.WaitOne(); // No more tasks - wait for a signal
}
}
}
static void Main()
{
FileInfo originalFile = new FileInfo(originalFilePath);
byte[] data = new byte[blockSize];
int bytesRead;
using (FileStream originalFileStream = originalFile.OpenRead()) // file input stream
using (FileStream fileOutputStream = new FileStream(outputFilePath, FileMode.Create, FileAccess.Write))
using (ProducerConsumerApp q = new ProducerConsumerApp(fileOutputStream))
{
while ((bytesRead = originalFileStream.Read(data, 0, blockSize)) > 0) // reads blocks of data from a file
{
// test - in case of a text file:
//string str = System.Text.Encoding.Default.GetString(data);
//Console.WriteLine("data block read from a file:" + str);
if (bytesRead < data.Length)
{
byte[] data2 = new byte[bytesRead];
Array.Copy(data, data2, bytesRead);
data = data2;
}
q.enqueueObject(data); // put the data into the queue
data = new byte[blockSize];
}
}
// because of "using" the Dispose method is going to be called in the end which will call enqueueObject(null) resulting in stopping the consumer thread
Console.WriteLine("Finish");
}
}
You problem is that you wait in a lock. This means that the other thread will also block on the lock statement and never call
_producerThreadWaitEventHandler.Set();Classical deadlock.You better use a Semaphore to limit the number of items the produce can put in the queue.
Initialize the semaphore as all free:
producerSemaphore = new Semaphore (15, 15);. In the producer, Wait on the semaphore, and in the consumer - callRelease().In the same fashion, you can use a Semaphore or a CountdownEvent to avoid relying on
queue.Count.Even better, you can use a ConcurrentQueue in combination with the semaphore to make sure the producer doesn't overfill the queue. If you successfully dequeue an item from the queue, call
producerSemaphore.Release();.