How to queue WhenAnyValue subscriber calls containing asynchronous code?

146 Views Asked by At

What is the correct way to prevent subscribers to be called in parallel, before previous call is completed?

I have kind of race condition atm with code like this

SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
{
    if(someBool)
    {
        await ...
        Start();
    }
    else
    {
        await ...
        Stop();
    }
});

If SomeBool changes rapidly, then it can happens that calls will be like this:

Start()
Stop()
Stop()
Start()

or worse. How can I ensure that it is always

Start()
Stop()
Start()
Stop()

I can put lock inside or use some kind of queue to ensure order of calls. But I hope there is something existing for situation like this or I rather need to use reactive concept correctly, e.g. creating a new observable or who knows what.


Forgot to add mcve. Create new console app, add nugets: ReactiveUI and ReactiveUI.Fody.

class Program
{
    static SomeReactive SomeReactive { get; } = new();

    static void Main(string[] args)
    {
        SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
        {
            if (someBool)
            {
                await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                Console.WriteLine("start");
            }
            else
            {
                await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                Console.WriteLine("stop");
            }
        });

        for (int i = 0; i < 10; i++)
        {
            SomeReactive.SomeBool = !SomeReactive.SomeBool;
            Thread.Sleep(50);
        }

        Console.ReadKey();
    }
}

class SomeReactive : ReactiveObject
{
    [Reactive]
    public bool SomeBool { get; set; }
}
4

There are 4 best solutions below

0
Sinatr On BEST ANSWER

Using this answer I've made my own SybscribeAsync extension method with parameter:

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> func) =>
            source.Select(o => Observable.FromAsync(_ => func(o)))
                .Concat()
                .Subscribe();

The method SubscribeAsync should be used instead of Subscribe(async o => ...), unless async void is not a problem (it could be).

P.S.: naming method SubscribeSynchronous like ReactiveMarbles does is the option.

3
Chris Pulman On

If I was trying to do this, I would use SubscribeSynchronus as follows: Add the nuget package - ReactiveMarbles.Extensions <PackageReference Include="ReactiveMarbles.Extensions" Version="1.1.12" /> then use as follows:

    using ReactiveMarbles.Extensions;

    class Program
    {
        static SomeReactive SomeReactive { get; } = new();
    
        static void Main(string[] args)
        {
            SomeReactive
            .WhenAnyValue(o => o.SomeBool)
            .SubscribeSynchronous(async someBool =>
            {
                if (someBool)
                {
                    await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                    Console.WriteLine("start");
                }
                else
                {
                    await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                    Console.WriteLine("stop");
                }
            });
    
            for (int i = 0; i < 10; i++)
            {
                SomeReactive.SomeBool = !SomeReactive.SomeBool;
                Thread.Sleep(50);
            }
    
            Console.ReadKey();
        }
    }
    
    class SomeReactive : ReactiveObject
    {
        [Reactive]
        public bool SomeBool { get; set; }
    }

Hopefully this works in your situation.

There is also SyncronizeAsync() which will add a Sync element to the stream call x.Sync.Dispose() to release the next value and x.Value is the upstream value. This gives more control as to when the next value will be released and allows other functionality to be added in between.

    using ReactiveMarbles.Extensions;

    class Program
    {
        static SomeReactive SomeReactive { get; } = new();
    
        static void Main(string[] args)
        {
            SomeReactive
            .WhenAnyValue(o => o.SomeBool)
            .SyncronizeAsync()
            .Subscribe(async someBool =>
            {
                if (someBool.Value)
                {
                    await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                    Console.WriteLine("start");
                }
                else
                {
                    await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                    Console.WriteLine("stop");
                }
                
                someBool.Sync.Dispose();
            });
    
            for (int i = 0; i < 10; i++)
            {
                SomeReactive.SomeBool = !SomeReactive.SomeBool;
                Thread.Sleep(50);
            }
    
            Console.ReadKey();
        }
    }
    
    class SomeReactive : ReactiveObject
    {
        [Reactive]
        public bool SomeBool { get; set; }
    }
3
Lukasz Szczygielek On

I returned to your question, and I have something promising:

SomeReactive.WhenAnyValue(o => o.SomeBool)
    .Select(flag => Observable.Create<Unit>(async _ =>
    {
        if (flag)
        {
            await Task.Delay(100);
            Console.WriteLine("Start - select + concat");
        }
        else
        {
            await Task.Delay(300);
            Console.WriteLine("Stop - select + concat");
        }
    }))
    .Concat()
    .Subscribe();

Select creates a new Observable in the pipeline - here, we have a logic. Next, we need to preserve order/not run in parallel, so Concat is used. At the end, Subscribe is needed.

During my investigation, I also made a less RX code with semaphore.

SemaphoreSlim semaphoreSlim = new(1);
SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async flag =>
{
    await semaphoreSlim.WaitAsync();

    try
    {
        if (flag)
        {
            await Task.Delay(100);
            Console.WriteLine("Start - semaphore");
        }
        else
        {
            await Task.Delay(300);
            Console.WriteLine("Stop - semaphore");
        }
    }
    finally
    {
        semaphoreSlim.Release();
    }
});
1
Progman On

You can use Task.Run() to execute the "inner" part of your Subscribe() call with the await/async keywords, but wait for its completion via GetAwaiter() and GetResult() synchronously inside your Subscribe() call (see How to call asynchronous method from synchronous method in C#?). The code could look like this:

public class Program
{       
    static SomeReactive SomeReactive { get; } = new();

    static void Main(string[] args)
    {
        
        SomeReactive
            .WhenAnyValue(o => o.SomeBool)
            .Subscribe(someBool => {
                    Task.Run(async ()  =>
                    {
                        if (someBool)
                        {
                            await Task.Delay((int)(Random.Shared.NextDouble() * 1000));
                            Console.WriteLine("start");
                            
                        }
                        else
                        {
                            await Task.Delay((int)(Random.Shared.NextDouble() * 1000));
                            Console.WriteLine("stop");                        
                        }
                    })
                    .GetAwaiter().GetResult();
            });

        for (int i = 0; i < 10; i++)
        {
            SomeReactive.SomeBool = !SomeReactive.SomeBool;
            Thread.Sleep(50);
        }

        Console.ReadKey();
    }
}

class SomeReactive : ReactiveObject
{
    [Reactive]
    public bool SomeBool { get; set; }
}

This will generate the following output:

stop
start
stop
start
stop
start
stop
start
stop
start
stop