I have a method that looks up an item asynchronously from a datastore;
class MyThing {}
Task<Try<MyThing>> GetThing(int thingId) {...}
I want to look up multiple items from the datastore, and wrote a new method to do this. I also wrote a helper method that will take multiple Try<T> and combine their results into a single Try<IEnumerable<T>>.
public static class TryExtensions
{
Try<IEnumerable<T>> Collapse<T>(this IEnumerable<Try<T>> items)
{
var failures = items.Fails().ToArray();
return failures.Any() ?
Try<IEnumerable<T>>(new AggregateException(failures)) :
Try(items.Select(i => i.Succ(a => a).Fail(Enumerable.Empty<T>())));
}
}
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var results = new List<Try<Things>>();
foreach (var id in ids)
{
var thing = await GetThing(id);
results.Add(thing);
}
return results.Collapse().Map(p => p.ToArray());
}
Another way to do it would be like this;
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var tasks = ids.Select(async id => await GetThing(id)).ToArray();
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}
The problem with this is that all the tasks will run in parallel and I don't want to hammer my datastore with lots of parallel requests. What I really want is to make my code functional, using monadic principles and features of LanguageExt. Does anyone know how to achieve this?
Update
Thanks for the suggestion @MatthewWatson, this is what it looks like with the SemaphoreSlim;
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var mutex = new SemaphoreSlim(1);
var results = ids.Select(async id =>
{
await mutex.WaitAsync();
try { return await GetThing(id); }
finally { mutex.Release(); }
}).ToArray();
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).Collapse().Map(Enumerable.ToArray);
return results.Collapse().Map(p => p.ToArray());
}
Problem is, this is still not very monadic / functional, and ends up with more lines of code than my original code with a foreach block.
In the "Another way" you almost achieved your goal when you called:
var tasks = ids.Select(async id => await GetThing(id)).ToArray();Except that Tasks doesn't run sequentially so you will end up with many queries hitting your datastore, which is caused by
.ToArray()andTask.WhenAll. Once you called.ToArray()it allocated and started the Tasks already, so if you can "tolerate" oneforeachto achieve the sequential tasks running, like this:Back to our
RunSequentially, I would like to make it more functional like this for example:But sadly this will still run kinda "Parallel" tasks.
So the final usage should be:
Another overkill functional solution is to get Lazy in a Queue recursively !!
Instead
GetThing, get a Lazy oneGetLazyThingthat returnsLazy<Task<Try<MyThing>>>simply by wrappingGetThing:Now using couple extensions/functions:
Finally:
Update
However if you don't like the fact that
EnqueueAllandRunQueueare not "pure", we can take the following approach with the sameLazytrickNow: