I have a producer consumer scenario. Producer produces data for BlockingCollection and some other thread consumes it. When consumer takes an item it must write it to a text file. There is no limit how much data can be produced by producer thread so I can't mark collection as complete. What would be the best approach to implement a consumer? My implementation:
Task.Factory.StartNew(() =>
{
try
{
using (var writer = new StreamWriter(path, true))
{
foreach (var line in _collection.GetConsumingEnumerable(token))
{
writer.WriteLine(line);
}
}
}
}, token);
The problem with this is that I keep StreamWriter open even if there is nothing inside my collection. I can put StreamWriter inside ForEach, but then I have to open and close it every time a new item arrives. And if I'm expecting thousands of items in some cases that would degrade performance.
I have read this post which describes the same problem. People suggested to use ActionBlock but it's a .NET 4.5 feature, and I'm using .NET 4.
My suggestion is to have two consuming loops, the one nested inside the other. The outer loop consumes the
BlockingCollection<string>
with infinite wait timeout, and the inner loop with a small timeout like 5 seconds. The text file is opened before entering the inner loop, and closed after exiting the inner loop. This way the file will be continuously open while there are frequent incoming messages, and will be closed whenever the blocking collection becomes cold.The above example assumes that opening the file will always be successful. In case it's not, the consumed
line
will be lost. I don't know how you would prefer to handle this situation. One idea is to open the file in atry
block, and in thecatch
blockAdd
again theline
in the_collection
before rethrowing the exception.