Kotlin coroutines: How to use Array channel with filter / map?

1.5k Views Asked by At

I need to insert a buffer of 10000 elements between the various channel processors.

produce() provides a way to configure the buffer size:

produce(capacity = 10_000) {
}

However map, filter default to a Rendezvous channel:

fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
    produce(context) { // No capacity specified, defaults to 0
        consumeEach {
            send(transform(it))
        }
    }

Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.

1

There are 1 best solutions below

0
On

The only way is to provide your own map implementation with capacity parameter:

fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
    produce(context, capacity = capacity) {
        consumeEach {
            send(transform(it))
        }
    }

I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.