Moving Window With Kotlin Flow

1.1k Views Asked by At

I am trying to create a moving window of data using Kotlin Flows. It can be achieved in RxKotlin using a buffer, but buffer is not the same using Flows.

RxKotlin has a buffer operator, periodically gathers items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time - buffer(count, skip)

Kotlin Flow has a buffer but that just runs a collector in a separate coroutine - buffer

Is there an existing operator in Flows that can achieve this?

1

There are 1 best solutions below

0
On

I think what you are looking for is not available in the Kotlinx Coroutines library but there is an open issue.

There is also a possible implementation in this comment which I will also include here:

fun <T> Flow<T>.windowed(size: Int, step: Int): Flow<List<T>> = flow {
    // check that size and step are > 0
    val queue = ArrayDeque<T>(size)
    val toSkip = max(step - size, 0) < if sbd would like to skip some elements before getting another window, by serving step greater than size, then why not?
    val toRemove = min(step, size)
    var skipped = 0
    collect { element ->
        if(queue.size < size && skipped == toSkip) {
            queue.add(element)
        }
        else if (queue.size < size && skipped < toSkip) {
        skipped++
    }

        if(queue.size == size) {
            emit(queue.toList())
            repeat(toRemove) { queue.remove() }
            skipped = 0
        }
    }
}