I am trying to write a Scheduler that allows Limited Concurrency. Here is what I have written.
public class ThreeAtATimeScheduler : IScheduler
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(3);
public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
var scheduler = ThreadPoolScheduler.Instance;
return new CompositeDisposable(
Disposable.Create(() => _semaphore.Release()),
_semaphore.WaitAsync().
ContinueWith(t => scheduler.Schedule(state, action))
);
}
...
}
And here is how I am trying to use it (In my use case I have to accumulate some messages before I can start working on a batch).
var scheduler = new ThreeAtATimeScheduler();
var subscription =
Observable.Range(0, 100)
.Buffer(TimeSpan.FromSeconds(1), 10)
.SelectMany(x => x)
.Select(x => Observable.Start(() =>
{
Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
}, scheduler))
.Concat()
.Subscribe();
My program shows the following output and ends after a few seconds.
Performing for Thread 3
Performing for Thread 5
Performing for Thread 8
I debugged and found that none of the Semaphores are being released ever.
If there is a better way to write this scheduler, I would love to find out about it but I would also love to find out why mine isn't working.
My guess is that the Dispose has to be called explicitly on the returned Disposable but the Observable.Start() implementation discards your disposable and returns it's own Empty one, and the Garbage Collector never calls Dispose, hence Release never happens.
Old answer
As for me, you do not need the custom scheduler here. You can limit the concurrency purely by Rx linq methods like this:
The example shows the sum calculated from input buffers.
Then you can wrap
.Select.Mergeinto your own extension method which accepts the lambda and degree of parallelism: