How to wait subscriber to complete before onNext?

103 Views Asked by At

I am very new to reactive programming and I stuck at one point.

I try to implement if my collection has reaches a certain number element or certain time has paseed i need to trigger some method.

Purpose of use: I will process the events coming from the database, but I need to mark where I left off after every 200 events OR save last processed event every 10 seconds because maybe there won't be an event to reach 200 for a long time. But OnNext do not wait finish saveCheckPoint method and continue acepting processed item. I need to wait in before OnNext to sake of data consistency.

To simulate my design you can check this code, how can i achieve my requirement?

Subject<string> events = new Subject<string>();

events
    .Buffer(TimeSpan.FromSeconds(10), 200)
    .SelectMany(async x => await SaveCheckpoint(x)).Subscribe();


async Task<bool> SaveCheckpoint(IList<string> i)
{
    var lastProcessedEvent = i.LastOrDefault();

    if (lastProcessedEvent != null)
    {
        //Save checkpoint to db or somewhere else
        await Task.Delay(1000);
        Console.WriteLine($"Saved checkpoint of : {lastProcessedEvent}");
    }
   
    return await Task.FromResult(true);
}


for (int j = 1; j < 10000; j++)
{
    Console.WriteLine("Event processed");
    // I need this step must be wait until subscriber to comlete task
    events.OnNext($"Event - {j}");
}

I am expecting to find a way to wait complete of Subscribe method before onNext

1

There are 1 best solutions below

8
Oleg Dok On BEST ANSWER

As for me you need to do it without async/await execution, like this:

static async Task Main()
{
    void TimestampedPrint(object o) => Console.WriteLine($"{DateTime.Now:HH.mm.ss.fff}: {o}");
    Subject<string> events = new Subject<string>();

    using var disposable = events
        .Buffer(TimeSpan.FromSeconds(1), 20)
        .Subscribe(x => SaveCheckpoint(x).GetAwaiter().GetResult());


    async Task SaveCheckpoint(IList<string> i)
    {
        var lastProcessedEvent = i.LastOrDefault();

        if (lastProcessedEvent != null)
        {
            //Save checkpoint to db or somewhere else
            await Task.Delay(100);
            TimestampedPrint($"Saved checkpoint of : {lastProcessedEvent}");
        }
    }


    for (int j = 1; j < 100; j++)
    {
        TimestampedPrint($"Event processed {j}");
        // This do not wait subscriber to finish batch
        events.OnNext($"Event - {j}");
    }

    TimestampedPrint("End");

    Console.ReadLine();

}

This produces the following output:

12.54.50.657: Event processed 1
12.54.50.693: Event processed 2
12.54.50.693: Event processed 3
12.54.50.693: Event processed 4
12.54.50.693: Event processed 5
12.54.50.693: Event processed 6
12.54.50.693: Event processed 7
12.54.50.693: Event processed 8
12.54.50.693: Event processed 9
12.54.50.693: Event processed 10
12.54.50.693: Event processed 11
12.54.50.693: Event processed 12
12.54.50.693: Event processed 13
12.54.50.693: Event processed 14
12.54.50.693: Event processed 15
12.54.50.693: Event processed 16
12.54.50.693: Event processed 17
12.54.50.693: Event processed 18
12.54.50.693: Event processed 19
12.54.50.693: Event processed 20
12.54.50.807: Saved checkpoint of : Event - 20
12.54.50.808: Event processed 21
12.54.50.808: Event processed 22
12.54.50.808: Event processed 23
12.54.50.808: Event processed 24
12.54.50.808: Event processed 25
12.54.50.808: Event processed 26
12.54.50.808: Event processed 27
12.54.50.808: Event processed 28
12.54.50.808: Event processed 29
12.54.50.808: Event processed 30
12.54.50.808: Event processed 31
12.54.50.808: Event processed 32
12.54.50.808: Event processed 33
12.54.50.808: Event processed 34
12.54.50.808: Event processed 35
12.54.50.808: Event processed 36
12.54.50.808: Event processed 37
12.54.50.808: Event processed 38
12.54.50.808: Event processed 39
12.54.50.808: Event processed 40
12.54.50.909: Saved checkpoint of : Event - 40
12.54.50.909: Event processed 41
12.54.50.909: Event processed 42
12.54.50.909: Event processed 43
12.54.50.909: Event processed 44
12.54.50.909: Event processed 45
12.54.50.909: Event processed 46
12.54.50.909: Event processed 47
12.54.50.909: Event processed 48
12.54.50.909: Event processed 49
12.54.50.909: Event processed 50
12.54.50.909: Event processed 51
12.54.50.909: Event processed 52
12.54.50.909: Event processed 53
12.54.50.909: Event processed 54
12.54.50.909: Event processed 55
12.54.50.909: Event processed 56
12.54.50.909: Event processed 57
12.54.50.909: Event processed 58
12.54.50.909: Event processed 59
12.54.50.909: Event processed 60
12.54.51.007: Saved checkpoint of : Event - 60
12.54.51.007: Event processed 61
12.54.51.007: Event processed 62
12.54.51.007: Event processed 63
12.54.51.007: Event processed 64
12.54.51.007: Event processed 65
12.54.51.007: Event processed 66
12.54.51.007: Event processed 67
12.54.51.007: Event processed 68
12.54.51.007: Event processed 69
12.54.51.007: Event processed 70
12.54.51.007: Event processed 71
12.54.51.007: Event processed 72
12.54.51.007: Event processed 73
12.54.51.007: Event processed 74
12.54.51.007: Event processed 75
12.54.51.007: Event processed 76
12.54.51.008: Event processed 77
12.54.51.008: Event processed 78
12.54.51.008: Event processed 79
12.54.51.008: Event processed 80
12.54.51.108: Saved checkpoint of : Event - 80
12.54.51.108: Event processed 81
12.54.51.108: Event processed 82
12.54.51.108: Event processed 83
12.54.51.108: Event processed 84
12.54.51.108: Event processed 85
12.54.51.108: Event processed 86
12.54.51.108: Event processed 87
12.54.51.108: Event processed 88
12.54.51.108: Event processed 89
12.54.51.108: Event processed 90
12.54.51.108: Event processed 91
12.54.51.108: Event processed 92
12.54.51.108: Event processed 93
12.54.51.108: Event processed 94
12.54.51.108: Event processed 95
12.54.51.108: Event processed 96
12.54.51.108: Event processed 97
12.54.51.108: Event processed 98
12.54.51.108: Event processed 99
12.54.51.108: End
12.54.52.212: Saved checkpoint of : Event - 99