Looping through a List<T> in batches while ensuring items per batch are unique

1.4k Views Asked by At

Context: I have an application that allows the user to process all the mailed-in payments received that day. Sometimes an envelope may include multiple checks for the same account (think two roommates each paying their portion of a utility bill).

Restrictions: Process all the payments in batches of 10 but the Account ID must be unique per batch.

Very simplified Payment class:

public class Payment
{
    public int AccountId { get; set; }
    // ... other properties not important
}

A hypothetical collection of payments received in the mail today. Notice that the last two AccountId values are acceptable duplicates:

List<Payment> payments = new List<Payment>()
{
    new Payment() {AccountId = 1 },
    new Payment() {AccountId = 2 },
    new Payment() {AccountId = 3 },
    new Payment() {AccountId = 4 },
    new Payment() {AccountId = 5 },
    new Payment() {AccountId = 1 }, // Duplicate Account
    new Payment() {AccountId = 2 }  // Duplicate Account

    // likely hundreds more unique accounts, possibly even some more duplicates...
};

I'm using MoreLinq to try to select distinct accounts per batch but this code below is clearly not going to work. I feel like I'm close but have been unable to find a working solution. Again, the goal is to split all the payments into batches of N without duplicating the AccountId in that batch. Duplicate AccountIds must be spread across other batches so they don't cause a race condition when trying to update the customer's balance.

Edited code comments for clarity.

int batchSize = 10;
var paymentTasks = new List<Task>(batchSize);

// This linq expression is the heart of my question: How to divide the payments
// into batches while ensuring uniqueness of a particular key(s). This expression
// is close, but the DistinctBy() is obviously excluding the duplicates that
// I just intend to be distinct for that Batch(batchSize).
foreach (IEnumerable<Payment> batchOfPayments in payments.DistinctBy(a => a.AccountId).Batch(batchSize))
{
    // The rest of this method is for context only

    paymentTasks.Clear();

    foreach (Payment payment in batchOfPayments)
    {
        // Async method implementation not important
        Task paymentTask = ProcessPaymentAsync(payment);
        paymentTasks.Add(paymentTask);
    }

    // Await all the tasks in this batch to complete before starting the next batch
    await Task.WhenAll(paymentTasks);
}

Thank you for your time and for looking at my question.

3

There are 3 best solutions below

0
TheGeneral On BEST ANSWER

If I completely understand the problem, then there are many ways to do this and the best solution would depend on your actual needs.

The assumptions are :

  1. What you have described is an in memory approach
  2. It doesn't need to hit a database
  3. It doesn't need to be producer consumer.

Then a very simple (yet efficient) batch and queue pattern can be used with minimal allocations.

Given

public class Payment
{
   public int AccountId { get; set; }
   public Payment(int accountId) => AccountId = accountId;
}

And

public static IEnumerable<Payment[]> GetBatches(IEnumerable<Payment> source, int count)
{
   var hashset = new HashSet<int>(count);
   var batch = new List<Payment>(count);
   var leftOvers = new Queue<Payment>();

   while (true)
   {
      foreach (var item in source)
      {
         // check if batched
         if (hashset.Add(item.AccountId))
            batch.Add(item); // add to batch
         else
            leftOvers.Enqueue(item); // add to left overs

         // if we are at the batch size start a loop
         while (batch.Count == count)
         {
            yield return batch.ToArray(); // return the batch

            batch.Clear();
            hashset.Clear();

            // check the left overs
            while (leftOvers.Any() && batch.Count != count)
               if (hashset.Add(leftOvers.Peek().AccountId)) // check the batch
                  batch.Add(leftOvers.Dequeue());
               else break; // we still have a duplicate bail
         }
      }

      if(batch.Any()) yield return batch.ToArray();

      if (!leftOvers.Any()) break;

      source = leftOvers.ToList(); // allocation  :(
      hashset.Clear();
      batch.Clear();
      leftOvers.Clear();
   }
}

Note : This is fairly resource efficient, though it does probably have an extra unnecessary small allocation when dealing with pure leftovers, I am sure this could be removed, though I'll leave that up to you. There are also many efficiencies you could add with the use of a channel could easily be turned into a consumer

Test

var list = new List<Payment>() {new(1), new(2), new(3), new(4), new(4), new(5), new(6), new(4), new(4), new(6), new(4)};

var batches = GetBatches(list, 3);

foreach (var batch in batches)
   Console.WriteLine(string.Join(", ",batch.Select(x => x.AccountId)));

Output

1, 2, 3
4, 5, 6
4, 6
4
4
4

Full demo here to Play with

0
David L On

Please note that this assumes that this is NOT a real world example since it absolutely cannot prevent duplicates on the receiving end! This is simply an approach to distribute front-loaded batches without duplicates while only looping the collection once.

It relies on creating buckets on demand and tracking position both in the overall list of buckets, as well the current target bucket itself. In addition, it uses a custom EqualityComparer to enable O(1) lookups of a Payment in each individual bucket.

public async Task Implementation(List<Payment> payments)
{
    int batchSize = 10;
    var paymentTasks = new List<Task>();
    // use a custom equality comparer to allow for O(1) lookups
    // of payments based on AccountId
    var buckets = new List<HashSet<Payment>>();

    var incompletedBucketPosition = 0;
    var currentBucketPosition = 0;

    foreach (var payment in payments)
    {
        var isAdded = false;
        while (!isAdded)
        {
            if (currentBucketPosition >= buckets.Count)
            {
                // if we have run out of buckets, we need to
                // create a new one.
                // You could also create buckets ahead of time but
                // without knowing how many duplicates to expect,
                // this could be more efficient.
                buckets.Add(new HashSet<Payment>(batchSize));
            }
            
            var currentBucket = buckets[currentBucketPosition];

            if (currentBucket.Count >= batchSize)
            {   
                // our batch is complete. Advance to the next incomplete bucket
                // and reset our currentBucketPosition to the incomplete position.
                currentBucketPosition = ++incompletedBucketPosition;
            }
            else
            {
                if (currentBucket.Contains(payment, new PaymentComparer()))
                {
                    // we can't use this batch since our new payment would be a dupe
                    // advance to the next bucket position.
                    currentBucketPosition++;
                }
                else
                {
                    currentBucket.Add(payment);
                    isAdded = true;
                    // since we've successfully added an element,
                    // reset the currentBucketPosition to the last 
                    // incomplete bucket.
                    currentBucketPosition = incompletedBucketPosition;
                }
            }
        }
    }
    
    paymentTasks.AddRange(buckets.Select(b => ProcessPaymentAsync(b)));
    await Task.WhenAll(paymentTasks);
}

public async Task ProcessPaymentAsync(HashSet<Payment> paymentBatch)
{
    Console.WriteLine(string.Join(',', paymentBatch.Select(b => b.AccountId)));
    await Task.FromResult(0);
}

public class Payment
{
    public int AccountId { get; set; }
}

public class PaymentComparer : IEqualityComparer<Payment>
{
    public bool Equals(Payment x, Payment y)
    {
        return x?.AccountId == y?.AccountId;
    }

    public int GetHashCode(Payment obj)
    {
        return obj?.AccountId.GetHashCode() ?? 0;
    }
}

You can test it with the following:

void Main()
{
    List<Payment> payments = new List<Payment>()
    {
        new Payment() {AccountId = 1 },
        new Payment() {AccountId = 2 },
        new Payment() {AccountId = 3 },
        new Payment() {AccountId = 4 },
        new Payment() {AccountId = 5 },
        new Payment() {AccountId = 1 }, // Duplicate Account
        new Payment() {AccountId = 2 },  // Duplicate Account
        new Payment() {AccountId = 6 },
        new Payment() {AccountId = 7 },
        new Payment() {AccountId = 8 },
        new Payment() {AccountId = 9 },
        new Payment() {AccountId = 10 },
        new Payment() {AccountId = 6 },
        new Payment() {AccountId = 11 },
        new Payment() {AccountId = 12 },
        new Payment() {AccountId = 13 },
        new Payment() {AccountId = 14 },
        new Payment() {AccountId = 15 },
        new Payment() {AccountId = 15 },
        new Payment() {AccountId = 16 },
        new Payment() {AccountId = 17 },
        new Payment() {AccountId = 18 },
        new Payment() {AccountId = 6 },
        // likely hundreds more unique accounts, possibly even some more duplicates...
    };
    
    Implementation(payments).GetAwaiter().GetResult();  
}

which outputs:

1,2,3,4,5,6,7,8,9,10
1,2,6,11,12,13,14,15,16,17
15,18,6
0
Kyle L. On

I'm just here to post my version of @TheGeneral's answer. All credit for writing the original solution goes to him.

My version differs slightly as I've made it fully generic for use outside of OP's immediate use-case, see below:

public static IEnumerable<IEnumerable<TSource>> BatchDuplicate<TSource, TKey>(
    this IEnumerable<TSource> source, 
    Func<TSource, TKey> keySelector,
    int count
)
{
var hashset = new HashSet<TKey>(count);
var batch = new List<TSource>(count);
var leftOvers = new Queue<TSource>();

while (true)
{
    foreach (var item in source)
    {
        // check if batched
        if (hashset.Add(keySelector(item)))
        {
            batch.Add(item); // add to batch
        }
        else
        {
            leftOvers.Enqueue(item); // add to left overs
        }

        // if we are at the batch size start a loop
        while (batch.Count == count)
        {
            yield return batch.ToArray(); // return the batch

            batch.Clear();
            hashset.Clear();

            // check the left overs
            while (leftOvers.Any() && batch.Count != count)
            if (hashset.Add(keySelector(leftOvers.Peek())))
            {
                batch.Add(leftOvers.Dequeue());
            } // check the batch
            else break; // we still have a duplicate bail
        }
    }

    if(batch.Any()) yield return batch.ToArray();

    if (!leftOvers.Any()) break;

    source = leftOvers.ToList(); // allocation  :(
    hashset.Clear();
    batch.Clear();
    leftOvers.Clear();
}

Usage:

class MyModel
{
    public string MyKey { get; set; }
}

// // IEnumerable<MyModel>
someEnumerable.BatchDuplicate(item => item.MyKey, 10);

This allows the method to be used with any model as long as you provide the selector for your model's key