Concurrent collection supporting removal of a specified item?

29.5k Views Asked by At

Quite simple: Other than ConcurrentDictionary (which I'll use if I have to but it's not really the correct concept), is there any Concurrent collection (IProducerConsumer implementation) that supports removal of specific items based on simple equality of an item or a predicate defining a condition for removal?

Explanation: I have a multi-threaded, multi-stage workflow algorithm, which pulls objects from the DB and sticks them in a "starting" queue. From there they are grabbed by the next stage, further worked on, and stuffed into other queues. This process continues through a few more stages. Meanwhile, the first stage is invoked again by its supervisor and pulls objects out of the DB, and those can include objects still in process (because they haven't finished being processed and so haven't been re-persisted with the flag set saying they're done).

The solution I am designing is a master "in work" collection; objects go in that queue when they are retrieved for processing by the first stage, and are removed after they have been re-saved to the DB as "processed" by whatever stage of the workflow completed the necessary processing. While the object is in that list, it will be ignored if it is re-retrieved by the first stage.

I had planned to use a ConcurrentBag, but the only removal method (TryTake) removes an arbitrary item from the bag, not a specified one (and ConcurrentBag is slow in .NET 4). ConcurrentQueue and ConcurrentStack also do not allow removal of an item other than the next one it'll give you, leaving ConcurrentDictionary, which would work but is more than I need (all I really need is to store the Id of the records being processed; they don't change during the workflow).

3

There are 3 best solutions below

0
On BEST ANSWER

The reason why there is no such a data structure is that all collections have lookup operation time of O(n). These are IndexOf, Remove(element) etc. They all enumerate through all elements and checking them for equality.

Only hash tables have lookup time of O(1). In concurrent scenario O(n) lookup time would lead to very long lock of a collection. Other threads will not be able to add elements during this time.

In dictionary only the cell hit by hash will be locked. Other threads can continue adding while one is checking for equality through elements in hash cell.

My advice is go on and use ConcurrentDictionary.


By the way, you are right that ConcurrentDictionary is a bit oversized for your solution. What you really need is to check quickly weather an object is in work or not. A HashSet would be a perfect for that. It does basically nothing then Add(element), Contains(element), Remove(element). There is a ConcurrentHeshSet implementation in java. For c# I found this: How to implement ConcurrentHashSet in .Net don't know how good is it.

As a first step I would still write a wrapper with HashSet interface around ConcurrentDictionary bring it up and running and then try different implementations and see performance differences.

3
On

It's really hard to make a collection thread-safe in the generic sense. There are so many factors that go into thread-safety that are outside the responsibility or purview of a library/framework class that affect the ability for it to be truly "thread-safe"... One of the drawbacks as you've pointed out is the performance. It's impossible to write a performant collection that is also thread-safe because it has to assume the worst...

The generally recommended practice is to use whatever collection you want and access it in a thread-safe way. This is basically why there aren't more thread-safe collections in the framework. More on this can be found at http://blogs.msdn.com/b/bclteam/archive/2005/03/15/396399.aspx#9534371

10
On

As already explained by it other posts its not possible to remove items from a Queue or ConcurrentQueue by default, but actually the easiest way to get around is to extend or wrap the item.

public class QueueItem
{
    public Boolean IsRemoved { get; private set; }
    public void Remove() { IsRemoved = true; }
}

And when dequeuing:

QueueItem item = _Queue.Dequeue(); // Or TryDequeue if you use a concurrent dictionary
if (!item.IsRemoved)
{
    // Do work here
}