How to collect values every n seconds with a max count in each interval from a signal in ReactiveSwift?

96 Views Asked by At

It seems like a combination of collect(every:on:skipEmpty:discardWhenCompleted:) and collect(count:) in ReactiveSwift. The resulting signal would send an event every n seconds if the count of accumulated values doesn't reach max count during each time interval. But if in a specific time interval, the count of values has reached max count, it will send immediately.

For example, timeInterval = 2s, maxCount = 2 enter image description here

  • interval 1: received two values [1, 2], forward them at end of interval 1
  • interval 2: received one value [3], forward them at end of interval 2
  • interval 3: received three values [5, 6, 7] ( 3 values > maxCount), forward [5, 6] immediately when 7 is received and 7 is regarded as received value in interval 4 (interval 3 stopped early)
2

There are 2 best solutions below

0
Alexey Golikov On BEST ANSWER
extension SignalProducer {
    func collect(count: Int, every: DispatchTimeInterval, on scheduler: QueueScheduler) -> SignalProducer<[Value], Error> {
        SignalProducer<[Value], Error> { (observer: Signal<[Value], Error>.Observer, lifetime: Lifetime) in
            var collectedValues: [Value] = []
            
            let disposable = CompositeDisposable()
            
            disposable += self
                .observe(on: scheduler)
                .start(Signal<Value, Error>.Observer(value: { value in
                    collectedValues.append(value)
                    
                    if collectedValues.count == count {
                        observer.send(value: collectedValues)
                        collectedValues.removeAll()
                    }
                },
                                                     failed: { error in
                    observer.send(error: error)
                },
                                                     completed: {
                    if collectedValues.count > 0 {
                        observer.send(value: collectedValues)
                        collectedValues.removeAll()
                    }
                    
                    observer.sendCompleted()
                },
                                                     interrupted: {
                    observer.sendInterrupted()
                }))
            
            disposable += SignalProducer<Date, Never>.timer(interval: every,
                                                            on: scheduler)
            .observe(on: scheduler)
            .startWithValues { _ in
                observer.send(value: collectedValues)
                collectedValues.removeAll()
            }
            
            lifetime.observeEnded {
                disposable.dispose()
            }
        }
    }
}

It collects an array of values until it reaches a certain count and then fires or fires every time interval.

Test:

var counter = 0
let valuesGenerator = SignalProducer.timer(interval: .milliseconds(550), on: QueueScheduler.main)
    .filter { date in
        Calendar.current.component(.second, from: date) % 3 == 0
    }
    .map { _ in
        defer { counter += 1 }
        return counter
    }
    .take(first: 20)
    .promoteError(Error.self)
        
valuesGenerator
.collect(count: 3, every: .seconds(5), on: QueueScheduler.main)
.start(Signal<[Int], Error>.Observer(value: { values in
       print("valuesGenerator, date: \(Date()), values: \(values)")
    },
                                         failed: { error in
        print("valuesGenerator, date: \(Date()), error: \(error)")
    },
                                         completed: {
        print("valuesGenerator, date: \(Date()), completed")
    },
                                         interrupted: {
        print("valuesGenerator, date: \(Date()), interrupted")
    }))

Output:

valuesGenerator, date: 2024-02-16 16:37:51 +0000, values: [0, 1, 2]
valuesGenerator, date: 2024-02-16 16:37:51 +0000, values: [3]
valuesGenerator, date: 2024-02-16 16:37:56 +0000, values: [4, 5]
valuesGenerator, date: 2024-02-16 16:38:00 +0000, values: [6, 7, 8]
valuesGenerator, date: 2024-02-16 16:38:01 +0000, values: [9]
valuesGenerator, date: 2024-02-16 16:38:06 +0000, values: [10, 11, 12]
valuesGenerator, date: 2024-02-16 16:38:06 +0000, values: []
valuesGenerator, date: 2024-02-16 16:38:11 +0000, values: [13, 14]
valuesGenerator, date: 2024-02-16 16:38:15 +0000, values: [15, 16, 17]
valuesGenerator, date: 2024-02-16 16:38:16 +0000, values: []
valuesGenerator, date: 2024-02-16 16:38:21 +0000, values: [18, 19]
valuesGenerator, date: 2024-02-16 16:38:21 +0000, completed
3
Lubo On

Question is hard to grasp, but I will try.

If you want to batch emitted values by count and time, you can use bufferTimeout method. See documentation here https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#bufferTimeout-int-java.time.Duration-

Some example:

void bufferTimeoutTry() throws InterruptedException {
    Flux.interval(Duration.ofMillis(157))
            .filter(time -> time > 20 && time < 38 || time % 5 == 0 || time % 17 == 0)
            .bufferTimeout(5, Duration.ofSeconds(1))
            .doOnNext(list -> {
                // we will get list of items buffered in 1-second period of time, or at most 5 items.
            })
            .subscribe(System.out::println);
    Thread.sleep(30000);
}

Output will be list of items. Flux.interval is generating sequential long number (i.e. 1, 2, 3, 4, 5, ...), it is filtered on second line of method (to get some non interval behavior) and than buffer-ed. After buffer, there is no long on stream, but it has changed to list of longs.

[0, 5]
[10, 15]
[17, 20, 21, 22, 23]
[24, 25, 26, 27, 28]
[29, 30, 31, 32, 33]
[34, 35, 36, 37, 40]
[45, 50, 51]
[55, 60]
[65, 68, 70]
[75, 80]
[85, 90]
[95, 100]
[102, 105]
[110, 115]
[119, 120, 125]
[130, 135, 136]
[140, 145]
[150, 153, 155]
[160, 165]
[170, 175]
[180, 185]

Is this what you want?