IAsyncEnumerator.Current returns null when enumerators collection is not casted to a List

995 Views Asked by At

The first function is designed to enable linq to execute lambda functions safely in parallel (even the async void ones).

So you can do collection.AsParallel().ForAllASync(async x => await x.Action).

The second function is designed to enable you to combine and execute multiple IAsyncEnumerables in parallel and return their results as quick as possible.

I have the following code:

    public static async Task ForAllAsync<TSource>(
        this ParallelQuery<TSource> source, 
        Func<TSource, Task> selector,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? Math.Min(System.Environment.ProcessorCount, 128);
        using SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = source.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            
            try
            {
                await selector(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        await Task.WhenAll(tasks).ConfigureAwait(true);
    }

    public static async IAsyncEnumerable<T> ForAllAsync<TSource, T>(
        this ParallelQuery<TSource> source,
        Func<TSource, IAsyncEnumerable<T>> selector,
        int? maxDegreeOfParallelism = null,
        [EnumeratorCancellation]CancellationToken cancellationToken = default) 
        where T : new()
    {
        IEnumerable<(IAsyncEnumerator<T>, bool)> enumerators = 
            source.Select(x => (selector.Invoke(x).GetAsyncEnumerator(cancellationToken), true)).ToList();

        while (enumerators.Any())
        {
            await enumerators.AsParallel()
                .ForAllAsync(async e => e.Item2 = (await e.Item1.MoveNextAsync()), maxDegreeOfParallelism)
                .ConfigureAwait(false);
            foreach (var enumerator in enumerators)
            {
                yield return enumerator.Item1.Current;
            }
            enumerators = enumerators.Where(e => e.Item2);
        }
    }

If I remove the "ToList()" from the second function, yield return starts to return null as enumerator.Item1.Current tends to be null, despite enumerator.Item2 (the result from MoveNextAsync()) being true.

Why?

1

There are 1 best solutions below

0
On BEST ANSWER

This is a classic case of deferred execution. Every time you invoke an evaluating method on a non-materialized IEnumerable<>, it does the work to materialize the IEnumerable. In this case that's re-invoking your selector and creating new instances of the tasks that await the GetAsyncEnumerator calls.

With the call to .ToList() you materialize the IEnumerable. Without it, materialization occurs with with every call to .Any(), the call to ForAllAsync(), and at your foreach loop.

The same behavior can be reproduced minimally like this:

var enumerable = new[] { 1 }.Select(_ => Task.Delay(10));
await Task.WhenAll(enumerable);
Console.WriteLine(enumerable.First().IsCompleted); // False
enumerable = enumerable.ToList();
await Task.WhenAll(enumerable);
Console.WriteLine(enumerable.First().IsCompleted); // True

In the first call to enumerable.First(), we end up with a different task instance than the one that we awaited in the line before it.

In the second call, we're using the same instance because the Task was already materialized into a List.