I need to insert a buffer of 10000 elements between the various channel processors.
produce()
provides a way to configure the buffer size:
produce(capacity = 10_000) {
}
However map
, filter
default to a Rendezvous channel:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}
Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.
The only way is to provide your own map implementation with
capacity
parameter:I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.