Rx - Scheduler that only allows n number of parallel operations at a time

77 Views Asked by At

I am trying to write a Scheduler that allows Limited Concurrency. Here is what I have written.

public class ThreeAtATimeScheduler : IScheduler
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(3);

public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
    var scheduler = ThreadPoolScheduler.Instance;
    return new CompositeDisposable(
        Disposable.Create(() => _semaphore.Release()),
        _semaphore.WaitAsync().
        ContinueWith(t => scheduler.Schedule(state, action))
    );
 }

...
}

And here is how I am trying to use it (In my use case I have to accumulate some messages before I can start working on a batch).

var scheduler = new ThreeAtATimeScheduler();
var subscription = 
Observable.Range(0, 100)
    .Buffer(TimeSpan.FromSeconds(1), 10)
    .SelectMany(x => x)
    .Select(x => Observable.Start(() =>
    {
        Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
    }, scheduler))
    .Concat()
    .Subscribe();

My program shows the following output and ends after a few seconds.

Performing for Thread 3
Performing for Thread 5
Performing for Thread 8

I debugged and found that none of the Semaphores are being released ever.

If there is a better way to write this scheduler, I would love to find out about it but I would also love to find out why mine isn't working.

My guess is that the Dispose has to be called explicitly on the returned Disposable but the Observable.Start() implementation discards your disposable and returns it's own Empty one, and the Garbage Collector never calls Dispose, hence Release never happens.

2

There are 2 best solutions below

2
Oleg Dok On

Old answer

As for me, you do not need the custom scheduler here. You can limit the concurrency purely by Rx linq methods like this:

        Observable.Range(0, 100)
            .Buffer(10)
            .Select(x => Observable.FromAsync(() =>Task.Run(()=>
            {
                Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
                return x.Sum();
            })))
            .Merge(3) // <-- degree of parallelism
            .Subscribe(Console.WriteLine);
        Console.ReadLine();

The example shows the sum calculated from input buffers.

Then you can wrap .Select.Merge into your own extension method which accepts the lambda and degree of parallelism:

public static IObservable<TResult> Parallel<TSource, TResult>(this IObservable<TSource> input, Func<TSource, TResult> selector, int degreeOfParallelism) =>
    input
        .Select(x => Observable.FromAsync(() => Task.Run(()=>selector(x))))
        .Merge(degreeOfParallelism);

static void Main()
{
  Observable.Range(0, 100)
    .Buffer(10)
    .Parallel(x =>
    {
      Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
      return x.Sum();
    }, 3)
    .Subscribe(Console.WriteLine);
  Console.ReadLine();
}
0
Oleg Dok On

You can use the following approach with the semaphore as a gate. So you still need no custom scheduler, but the shared semaphore instance only.

Note that this is still not a robust solution, but PoC - it can have a semaphore slot leak if unsubscribed at certain times.

   public static IObservable<TResult> Parallel<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector, SemaphoreSlim semaphore)
   {
        var gate = Observable
            .Never<Unit>()
            .StartWith(Unit.Default)
            .Do(_ => semaphore.Wait())
            .Finally(() => semaphore.Release())
            .SubscribeOn(Scheduler.Immediate)
            .ObserveOn(Scheduler.Immediate);

       return source
           .Select(x =>
           {
               var gated = gate.Subscribe();
               return (x, gated);
           })
           .Select(x => Observable.FromAsync(() => Task.Run(() =>
           {
               try
               {
                   return selector(x.x);
               }
               finally
               {
                   x.gated.Dispose();
               }
           })))
           .Merge();
   }


    static void Main()
    {
        SemaphoreSlim _semaphore = new SemaphoreSlim(3);


        Observable
            .Range(0, 1000000)
            .Buffer(10)
            .Parallel(x =>
            {
                    Console.WriteLine($"Performing on Thread {Thread.CurrentThread.ManagedThreadId}, Semaphore state: {_semaphore.CurrentCount}");
                    Thread.Sleep(10);
                    return x.Sum();
            }, _semaphore)
            .Subscribe(Console.WriteLine);

        Console.ReadLine();
    }