How to wrap ConcurrentDictionary in BlockingCollection?

4.8k Views Asked by At

I try to implement a ConcurrentDictionary by wrapping it in a BlockingCollection but did not seem to be successful.

I understand that one variable declarations work with BlockingCollection such as ConcurrentBag<T>, ConcurrentQueue<T>, etc.

So, to create a ConcurrentBag wrapped in a BlockingCollection I would declare and instantiate like this:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>());

But how to do it for ConcurrentDictionary? I need the blocking functionality of the BlockingCollection on both the producer and consumer side.

3

There are 3 best solutions below

1
On

You'll need to write your own adapter class - something like:

public class ConcurrentDictionaryWrapper<TKey,TValue>
    : IProducerConsumerCollection<KeyValuePair<TKey,TValue>>
{
    private ConcurrentDictionary<TKey, TValue> dictionary;

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return dictionary.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        throw new NotImplementedException();
    }

    public int Count
    {
        get { return dictionary.Count; }
    }

    public object SyncRoot
    {
        get { return this; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        throw new NotImplementedException();
    }

    public bool TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return dictionary.TryAdd(item.Key, item.Value);
    }

    public bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = dictionary.FirstOrDefault();
        TValue value;
        return dictionary.TryRemove(item.Key, out value);
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        throw new NotImplementedException();
    }
}
0
On

Here is an implementation of a IProducerConsumerCollection<T> collection which is backed by a ConcurrentDictionary<TKey, TValue>. The T of the collection is of type KeyValuePair<TKey, TValue>. It is very similar to Nick Jones's implementation, with some improvements:

public class ConcurrentDictionaryProducerConsumer<TKey, TValue>
    : IProducerConsumerCollection<KeyValuePair<TKey, TValue>>
{
    private readonly ConcurrentDictionary<TKey, TValue> _dictionary;
    private readonly ThreadLocal<IEnumerator<KeyValuePair<TKey, TValue>>> _enumerator;

    public ConcurrentDictionaryProducerConsumer(
        IEqualityComparer<TKey> comparer = default)
    {
        _dictionary = new(comparer);
        _enumerator = new(() => _dictionary.GetEnumerator());
    }

    public bool TryAdd(KeyValuePair<TKey, TValue> entry)
    {
        if (!_dictionary.TryAdd(entry.Key, entry.Value))
            throw new DuplicateKeyException();
        return true;
    }

    public bool TryTake(out KeyValuePair<TKey, TValue> entry)
    {
        // Get a cached enumerator that is used only by the current thread.
        IEnumerator<KeyValuePair<TKey, TValue>> enumerator = _enumerator.Value;
        while (true)
        {
            enumerator.Reset();
            if (!enumerator.MoveNext())
                throw new InvalidOperationException();
            entry = enumerator.Current;
            if (!_dictionary.TryRemove(entry)) continue;
            return true;
        }
    }

    public int Count => _dictionary.Count;
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public KeyValuePair<TKey, TValue>[] ToArray() => _dictionary.ToArray();
    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
        => _dictionary.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
        => throw new NotSupportedException();
    public void CopyTo(Array array, int index) => throw new NotSupportedException();
}

public class DuplicateKeyException : InvalidOperationException { }

Usage example:

BlockingCollection<KeyValuePair<string, Item>> collection
    = new(new ConcurrentDictionaryProducerConsumer<string, Item>());

//...

try { collection.Add(KeyValuePair.Create(key, item)); }
catch (DuplicateKeyException) { Console.WriteLine($"The {key} was rejected."); }

The collection.TryTake method removes a practically random key from the ConcurrentDictionary, which is unlikely to be a desirable behavior. Also the performance is not great, and the memory allocations are significant. For these reasons I don't recommend enthusiastically to use the above implementation. I would suggest instead to take a look at the ConcurrentQueueNoDuplicates<T> that I have posted here, which has a proper queue behavior.

Caution: Calling collection.TryAdd(item); is not having the expected behavior of returning false if the key exists. Any attempt to add a duplicate key results invariably in a DuplicateKeyException. For an explanation look at the aforementioned other post.

0
On

Maybe you need a concurrent dictionary of blockingCollection

        ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>();
        int maxBoxes = 5;

        CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
        CancellationToken cancelationToken = cancelationTokenSource.Token;

        Random rnd = new Random();
        // Producer
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // put the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                box.Add("some message " + index, cancelationToken);
                Console.WriteLine("Produced a letter to put in box " + index);

                // Wait simulating a heavy production item.
                Thread.Sleep(1000);
            }
        });

        // Consumer 1
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 1: " + message);

                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });

        // Consumer 2
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 2: " + message);

                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });

        Console.ReadLine();
        cancelationTokenSource.Cancel();

By this way, a consumer which is expecting something in the mailbox 5, will wait until the productor puts a letter in the mailbox 5.