The first function is designed to enable linq to execute lambda functions safely in parallel (even the async void ones).
So you can do collection.AsParallel().ForAllASync(async x => await x.Action).
The second function is designed to enable you to combine and execute multiple IAsyncEnumerables in parallel and return their results as quick as possible.
I have the following code:
public static async Task ForAllAsync<TSource>(
this ParallelQuery<TSource> source,
Func<TSource, Task> selector,
int? maxDegreeOfParallelism = null)
{
int maxAsyncThreadCount = maxDegreeOfParallelism ?? Math.Min(System.Environment.ProcessorCount, 128);
using SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);
IEnumerable<Task> tasks = source.Select(async input =>
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
await selector(input).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks).ConfigureAwait(true);
}
public static async IAsyncEnumerable<T> ForAllAsync<TSource, T>(
this ParallelQuery<TSource> source,
Func<TSource, IAsyncEnumerable<T>> selector,
int? maxDegreeOfParallelism = null,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
where T : new()
{
IEnumerable<(IAsyncEnumerator<T>, bool)> enumerators =
source.Select(x => (selector.Invoke(x).GetAsyncEnumerator(cancellationToken), true)).ToList();
while (enumerators.Any())
{
await enumerators.AsParallel()
.ForAllAsync(async e => e.Item2 = (await e.Item1.MoveNextAsync()), maxDegreeOfParallelism)
.ConfigureAwait(false);
foreach (var enumerator in enumerators)
{
yield return enumerator.Item1.Current;
}
enumerators = enumerators.Where(e => e.Item2);
}
}
If I remove the "ToList()" from the second function, yield return starts to return null as enumerator.Item1.Current tends to be null, despite enumerator.Item2 (the result from MoveNextAsync()) being true.
Why?
This is a classic case of deferred execution. Every time you invoke an evaluating method on a non-materialized
IEnumerable<>
, it does the work to materialize the IEnumerable. In this case that's re-invoking your selector and creating new instances of the tasks that await the GetAsyncEnumerator calls.With the call to
.ToList()
you materialize the IEnumerable. Without it, materialization occurs with with every call to.Any()
, the call toForAllAsync()
, and at yourforeach
loop.The same behavior can be reproduced minimally like this:
In the first call to
enumerable.First()
, we end up with a different task instance than the one that we awaited in the line before it.In the second call, we're using the same instance because the Task was already materialized into a List.