How to ensure parallel tasks dequeue unique entries from ConcurrentQueue<T>?

685 Views Asked by At

Hi I have a concurrent Queue that is loaded with files from database. These files are to be processed by parallel Tasks that will dequeue the files. However I run into issues where after some time, I start getting tasks that dequeue the same file at the same time (which leads to "used by another process errors on the file). And I also get more tasks than are supposed to be allocated. I have even seen 8 tasks running at once which should not be happening. The active tasks limit is 5

Rough code:

private void ParseQueuedTDXFiles()
{
    while (_signalParseQueuedFilesEvent.WaitOne())
    {
        Task.Run(() => SetParsersTask());
    }
}

The _signalParseQueuedFilesEvent is set on a timer in a Windows Service The above function then calls SetParsersTask. This is why I use a concurrent Dictionary to track how many active tasks there are. And make sure they are below _ActiveTasksLimit:

private void SetParsersTask()
{
    
    if (_ConcurrentqueuedTdxFilesToParse.Count > 0)
    {
        if (_activeParserTasksDict.Count < _ActiveTasksLimit) //ConcurrentTask Dictionary Used to control how many Tasks should run
        {
            int parserCountToStart = _ActiveTasksLimit - _activeParserTasksDict.Count;
            Parallel.For(0, parserCountToStart, parserToStart =>
            {
                lock(_concurrentQueueLock)
                    Task.Run(() => PrepTdxParser());
            });
        }
    }

}

Which then calls this function which dequeues the Concurrent Queue:

private void PrepTdxParser()
{
    TdxFileToProcessData fileToProcess;
    lock (_concurrentQueueLock)
        _ConcurrentqueuedTdxFilesToParse.TryDequeue(out  fileToProcess);
    if (!string.IsNullOrEmpty(fileToProcess.TdxFileName))
    {
        LaunchTDXParser(fileToProcess);
    }
}

I even put a lock on _ConcurrentqueuedTdxFilesToParse even though I know it doesn't need one. All to make sure that I never run into a situation where two Tasks are dequeuing the same file.

This function is where I add and remove Tasks as well as launch the file parser for the dequeued file:

private void LaunchTDXParser(TdxFileToProcessData fileToProcess)
{
    string fileName = fileToProcess.TdxFileName;
    Task startParserTask = new Task(() => ConfigureAndStartProcess(fileName));
    _activeParserTasksDict.TryAdd(fileName, startParserTask);
    startParserTask.Start();
    Task.WaitAll(startParserTask);
    _activeParserTasksDict.TryRemove(fileName, out Task taskToBeRemoved);
}

Can you guys help me understand why I am getting the same file dequeued in two different Tasks? And why I am getting more Tasks than the _ActiveTasksLimit?

2

There are 2 best solutions below

3
Datboydozy On BEST ANSWER

So I fixed my problem. The solution was first to not add more parallelism than needs be. I was trying to create a situaion where private void SetParsersTask() would not be held by tasks that still needed to finish process a file. So I foolishly threw in Parallel.For in addition to Task.Start which is already parallel. I fixed this by generating Fire and Forget Tasks in a normal for loop as opposed to Paralle.For:

private void SetParsersTask()
{
    if (_queuedTdxFilesToParse.Count > 0)
    {
        if (_activeParserTasksDict.Count < _tdxParsersInstanceCount)
        {
            int parserCountToStart = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
            _queuedTdxFilesToParse = new ConcurrentQueue<TdxFileToProcessData>(_queuedTdxFilesToParse.Distinct());
            for (int i = 0; i < parserCountToStart; i++)
            {
                Task.Run(() => PrepTdxParser());
            }
            
        }
    }

}

After that I was still getting the occasional duplicate files so I moved the queue loading to another long running thread. And for that thread I use an AutoResetEvent so that the queue is only populated only once at any instance of time. As opposed to potentially another task loading it with duplicate files. It could be that both my enqueue and dequeue were both responsible and now it's addressed:

var _loadQueueTask = Task.Factory.StartNew(() => LoadQueue(), TaskCreationOptions.LongRunning);

private void LoadQueue()
{
    while (_loadConcurrentQueueEvent.WaitOne())
    {
        if (_queuedTdxFilesToParse.Count < _tdxParsersInstanceCount)
        {
            int numFilesToGet = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
            var filesToAdd = ServiceDBHelper.GetTdxFilesToEnqueueForProcessingFromDB(numFilesToGet);
            foreach (var fileToProc in filesToAdd)
            {
                ServiceDBHelper.UpdateTdxFileToProcessStatusAndUpdateDateTime(fileToProc.TdxFileName, 1, DateTime.Now);
                _queuedTdxFilesToParse.Enqueue(fileToProc);
            }

        }
    }
}

Thanks to Theo for pointing me to additional tools and making me look closer in my parallel loops

17
Theodor Zoulias On

There is a number of red flags in this¹ code:

  1. Using a WaitHandle. This tool it too primitive. I've never seen a problem solved with WaitHandles, that can't be solved in a simpler way without them.
  2. Launching Task.Run tasks in a fire-and-forget fashion.
  3. Launching a Parallel.For loop without configuring the MaxDegreeOfParallelism. This practically guarantees that the ThreadPool will get saturated.
  4. Protecting a queue (_queuedTdxFilesToParse) with a lock (_concurrentQueueLock) only partially. If the queue is a Queue<T>, you must protect it on each and every operation, otherwise the behavior of the program is undefined. If the queue is a ConcurrentQueue<T>, there is no need to protect it because it is thread-safe by itself.
  5. Calling Task.Factory.StartNew and Task.Start without configuring the scheduler argument.

So I am not surprised that your code is not working as expected. I can't point to a specific error that needs to be fixed. For me the whole approach is dubious, and needs to be reworked/scraped. Some concepts and tools that you might want to research before attempting to rewrite this code:

  1. The producer-consumer pattern.
  2. The BlockingCollection<T> class.
  3. The TPL Dataflow library.

Optionally you could consider familiarizing yourself with asynchronous programming. It can help at reducing the number of threads that your program uses while running, resulting in a more efficient and scalable program. Two powerful asynchronous tools is the Channel<T> class and the Parallel.ForEachAsync API (available from .NET 6 and later).

¹ This answer was intended for a related question that is now deleted.