How to break from ParallelFor loop at the lowest iteration that matches the condition?

85 Views Asked by At

I am trying to speed up the execution by parallelizing the matching between a given pattern myPattern and a long List of patterns patterns. The code I have below works fine if patterns had only one pattern matching myPattern but patterns can have multiple patterns that will match and the Parallel.For loop will write multiple i and the loop will return multiple i's if they fell under different threads. I have sorted patterns in a way that first pattern on the list is the one needed. This will work for serial version below but not for the parallel How can I break from the Parallel.For loop at the lowest iteration that matches the condition set inside the loop? For example if patterns[i] i = 26, i = 120, i= 1832 match, only get i = 26.

//Serial
for (int i = 0; i < patterns.Count; i++)
{
    bool match = MyMatcher(myPattern, patterns[i]);
    if (match)
    {
        Console.WriteLine(i.ToString());
        break;
    }
}

//Parallel
Parallel.For(0, patterns.Count, new ParallelOptions { MaxDegreeOfParallelism = 4 },
    (int i, ParallelLoopState state) =>
{
    bool match = MyMatcher(myPattern, patterns[i]);
    if (match)
    {
        Console.WriteLine(i.ToString());
        state.Stop();
    }
});
3

There are 3 best solutions below

0
Theodor Zoulias On BEST ANSWER

Instead of the Stop, you could use the Break functionality of the Parallel.For. The Break is more relaxed than the Stop. It doesn't terminate the loop instantly. It terminates it when all lower indices in the patterns list have been processed, giving the chance for a better match to be found.

Another issue is that the Parallel.For employs range partitioning, which is not suitable for your case. The range partitioning strategy splits the list in ranges, and dedicates one thread per range. In your case it is desirable to process the lower indices first, because even if you find a match in an upper index, you would still need to search all the lower indices for better matches. You can prioritize the lower indices by switching to the Parallel.ForEach method, and opting for dynamic partitioning with the Partitioner.Create method. Either the Partitioner.Create(patterns) or the Partitioner.Create(patterns, loadBalance: true) will do the job. The first option enumerates the list using the GetEnumerator of the list (synchronized), and the second enumerates it using the Count and the indexer [] of the list. The second option should be slightly more efficient because it is lock-free (source code), but the first has clearer incremental enumeration semantics. In the example below I am using the first option:

var partitioner = Partitioner.Create(patterns);

ParallelOptions options = new() { MaxDegreeOfParallelism = 4 };

ParallelLoopResult result = Parallel.ForEach(partitioner, options, (p, state) =>
{
    bool match = MyMatcher(myPattern, p);
    if (match) state.Break();
});

if (result.LowestBreakIteration.HasValue)
    Console.WriteLine(patterns[(int)result.LowestBreakIteration.Value]);
else
    Console.WriteLine("Not found"); // The Break was not called

After the completion of the loop you can examine the ParallelLoopResult.LowestBreakIteration, and see if the Break was called, and what was the lower index that called it. This index corresponds to the pattern that you are searching for.

Online demo.

0
Stanislav On

If the MyMatcher method is quite demanding on computing resources, you can split the collection into sorted bundles, and save the results in a ConcurrentBag. Otherwise, it is better to use the sequential option

using System.Collections.Concurrent;
 
var myPattern = 3;
var patterns = new List<long>();
for (int i = 0; i < 500; i++)
{
    patterns.Add(i);
}
var offset = 0;
var size = 10;
var batch = patterns.Skip(offset).Take(size).ToArray();
var result = new ConcurrentBag<long>();
while (batch.Length > 0 && result.Count == 0)
{
    offset += size;
    Parallel.For(0, batch.Length, new ParallelOptions { MaxDegreeOfParallelism = 4 }, (int i, ParallelLoopState state) =>
    { 
        if (MyMatcher(myPattern, batch[i]))
        {
            result.Add(i);
        }
    });
    batch = patterns.Skip(offset).Take(size).ToArray();
}


if (result.Count > 0)
{
    var match = result.OrderBy(x => x).FirstOrDefault();
    Console.WriteLine($"{match}");
}

Console.ReadLine();

static bool MyMatcher(long pattern, long value)
{
    if (value == 0)
        return false;
    return value % pattern == 0;
}
0
Eric J. On

I assume your intention is to stop processing as soon as you have a match. It isn't strictly possible to always do that due to how concurrency works, but it's possible to stop almost all additional processing after a match has been found.

You can get much more responsive cancellation of additional threads by using a CancellationToken and checking whether cancellation was requested in your matching algorithm.

Here's an illustration:

const int largePrime = 137077;

public static void Main()
{
    
    var patterns = GeneratePatterns(16);
    
    int myPattern = largePrime;
    
    Console.WriteLine("Serial Matching");
    
    //Serial
    for (int i = 0; i < patterns.Count; i++)
    {
        bool match = MyMatcher(myPattern, patterns[i], CancellationToken.None);
        if (match)
        {
            Console.WriteLine(i.ToString());
            break;
        }

    }

    Console.WriteLine("Parallel Matching");
    //Parallel
    
    // USE THIS TO SIGNAL PROCESSING IS DONE:
    var cancellationTokenSource = new CancellationTokenSource();
    
    Parallel.For(0, patterns.Count, new ParallelOptions { MaxDegreeOfParallelism = 4 }, (int i, ParallelLoopState state) =>
        {

            bool match = MyMatcher(myPattern, patterns[i], cancellationTokenSource.Token);
            if (match)
            {
                Console.WriteLine(i.ToString());
                // SIGNAL HERE:
                cancellationTokenSource.Cancel();
                state.Stop(); // All threads will stop soon without this
            }
        });         
}

static bool MyMatcher(int myPattern, List<int> pattern, CancellationToken token)
{
    foreach (var nr in pattern)
    {
        // CHECK FOR CANCELLATION HERE:
        if (token.IsCancellationRequested)
            break;
        if (nr % myPattern == 0)
            return true;
    }
            
    return false;
}

// THIS CODE DOESN'T MATTER MUCH. IT JUST GENERATES DATA
// TO TEST THE APPROACH WITH:
static List<List<int>> GeneratePatterns(int numberOfPatterns)
{
    List<List<int>> patterns = new List<List<int>>();
    for (int i = 0; i < numberOfPatterns; i++)
    {
        bool matchThisPattern = i % 2 == 0;
        if (matchThisPattern)
        {
            var from = i * largePrime;
            var to = from + largePrime - 1;
            patterns.Add(Enumerable.Range(from, to).ToList());
        }
    }
    
    return patterns;
}

Running this in LinqPad on a multicore computer, I typically get 1 or 2 matches returned before processing ends even though the input data is constructed so that each execution of MyMatch() will have almost exactly the same execution time (intentionally the worst-case scenario for concurrency).