How to invoke a consumer method as soon as BlockingCollection got populated?

127 Views Asked by At

Background:

By reading so many sources I understood BlockingCollection<T> is designed to get rid of the requirement of checking if new data is available in the shared collection between threads. if there is new data inserted into the shared collection then your consumer thread will awake immediately. So you do not have to check if new data is available for consumer thread in certain time intervals typically in a while loop.

I also have similar requirement:

  • I have a blocking collection of size 1.
  • This collection will be populated from 3 places (3 producers).
  • Currently using while loop to check whether collection has something or not.
  • Want to execute ProcessInbox() method as soon as blocking collection got a value and empty that collection, without checking if new data is available for consumer thread in certain time intervals typically in a while loop. How we can achieve it?
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
        
namespace ConsoleApp1
{
     class Program
     {
          private static BlockingCollection<int> _processingNotificationQueue = new(1);

          private static void GetDataFromQueue(CancellationToken cancellationToken)
          {
               Console.WriteLine("GDFQ called");
               int data;
               //while (!cancellationToken.IsCancellationRequested)
               while(!_processingNotificationQueue.IsCompleted)
               {
                    try
                    {
                         if(_processingNotificationQueue.TryTake(out data))
                         {
                              Console.WriteLine("Take");
                              ProcessInbox();
                         }
                    }
                    catch (Exception ex)
                    {

                    }
               }
          }
        
          private static void ProcessInbox()
          {
               Console.WriteLine("PI called");
          }
        
          private static void PostDataToQueue(object state)
          {
               Console.WriteLine("PDTQ called");
               _processingNotificationQueue.TryAdd(1);
          }
        
          private void MessageInsertedToTabale()
          {
               PostDataToQueue(new CancellationToken());
          }
        
          private void FewMessagesareNotProcessed()
          {
               PostDataToQueue(new CancellationToken());
          }
        
          static void Main(string[] args)
          {
               Console.WriteLine("Start");
               new Timer(PostDataToQueue, new CancellationToken(), TimeSpan.Zero,
                   TimeSpan.FromMilliseconds(100));
        
               // new Thread(()=> PostDataToQueue()).Start();
               new Thread(() => GetDataFromQueue(new CancellationToken())).Start();
        
               Console.WriteLine("End");
               Console.ReadKey();
          }
     }
}
1

There are 1 best solutions below

0
quetzalcoatl On

Just foreach over it. It's blocking. As long as it is not marked as completed, your foreach will HANG if the collection is empty, and will wake up as soon as new items were added.

See first ConsumingEnumerableDemo in of https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1?view=net-6.0 and imagine the consumer foreach (var item in bc.GetConsumingEnumerable()) is in another thread. The producer there has a delay between new items, so you should be able to easily tinker with it and see how consumer wakes up "relatively immediately". There's just one producer, but I don't see a problem with multiple producers, Add is thread-safe.

I can't guarantee that there's no significant delay between adding new item and waking up a sleeping consumer, because 'significant' is totally case-dependent word. There probably is some delay, at least for switching threads, but I doubt the collection does any additional throttling. I suppose it signals to wake up sleeping consumers before Add returns in the producer. And I suppose for the purposes of Inbox processing, that's probably UI thing for humans, and probably an order of 100ms delay won't be noticeable and I wouldn't expect the Add/Wakeup latency to be much below 100ms. No guarantees though.

If the 'blocking foreach' part sounds evil to you for some reason, you'll probably have to switch to a different synchronization mechanism (**). This is a BlockingCollection, right? It's not evented collection or something. The TryXXX methods are there for cases where you want to limit exceptions for some reason, and can deal with scheduling updates yourself like you do here (*).

(*) well, almost. This code you posted is missing 2 important things. Your while loop busy-spins at max speed when the collection is empty, that's usually a deadly no-no, especially for anything that runs on batteries. Consider addind some dead time to have if(hasItems) doWork; else sleep(sometime);. The other thing is try-catch. The docs say, when the collection throws, it means the collection is "done". NO MORE ITEMS EVER. No point in looping over a dead collection. The try-catch should be not inside the loop, but should encompass the loop so looping is stopped when collection is finished.

(**) I personally like RX extensions. Here it'd be a simple subject and one observer. Also, async/wait/IAsyncEnumerable are tempting, can help prevent synchronization by sleeping, but it still can end up with a busy-spinning loop if not done carefuly. And there are more choices, and the question was on BlockingCollection, so just FYI.