I have several identical api services available on different clients to which I would like to send several http-requests. This data is can be transformed into a map (Lookup<TKey, TSource>).
Additionally, I also want to return the response from a service as soon as it is available throught a IAsyncEnumerable.
A naive implementation would be to run through all requests one by one. However, I would like to make this more efficient by starting n requests to x services concurrently. Note I want to limit/throttle n and x. In other words x is how many clients I communicate with at once. I will not start communication with a new client until all requests has been processed.
n define how many parallel open request I can have to one given client at once.
Is there a preferred way to do this or can it be done better than what I have come up with?
I have tried to make a solution by using TransformBlock and TransformManyBlock, but I am not very familiar with these objects thus any feedback is appreciated.
public static async IAsyncEnumerable<TOut> ParallelForEveryEntry<TKey, TSource, TOut>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
int n,
int x,
Func<TSource, Task<TOut>> callback,
CancellationToken token = default
) where TKey : notnull {
if (!source.Any()) {
yield break;
}
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
TransformManyBlock<IGrouping<TKey, TSource>, TOut> blockCollection = new(collectionByEntry => {
var linkedToken = linkedTokenSource.Token;
TransformBlock<TSource, TOut> blockEntry = new(callback, new ExecutionDataflowBlockOptions() {
CancellationToken = linkedToken,
MaxDegreeOfParallelism = x,
EnsureOrdered = false
});
// Feed the block with input data
foreach (var entry in collectionByEntry) {
blockEntry.Post(entry);
}
blockEntry.Complete();
return blockEntry.ReceiveAllAsync().ToEnumerable();
}, new ExecutionDataflowBlockOptions() {
CancellationToken = token,
MaxDegreeOfParallelism = n,
EnsureOrdered = false
});
// Feed the block with input data
foreach (var collectionByEntry in source.ToLookup(keySelector)) {
blockCollection.Post(collectionByEntry);
}
blockCollection.Complete();
// Emit the output data as they become available
while (await blockCollection.OutputAvailableAsync()) {
while (blockCollection.TryReceive(out var item)) {
yield return item;
}
}
// Propagate possible exception (including cancellation)
await blockCollection.Completion;
}