I am just starting to familiarize myself with Kotlin flows.
For this, I am using them to parse the contents of a binary file which I will simulate using the following flow:
fun testFlow() = flow {
println("Starting loop")
try {
for (i in 0..5) {
emit(i)
delay(100)
}
println("Loop has finished")
}
finally {
println("Finally")
}
}
Now, I need the file contents multiple times basically to extract different sets of information. However, I don't want to read the file twice, but only once.
As there doesn't seem to be a built-in mechanism to clone / duplicate a flow, I developed the following helper function:
interface MultiConsumeBlock<T> {
suspend fun subscribe(): Flow<T>
}
suspend fun <T> Flow<T>.multiConsume(capacity: Int = DEFAULT_CONCURRENCY, scope: CoroutineScope? = null, block: suspend MultiConsumeBlock<T>.() -> Unit) {
val channel = buffer(capacity).broadcastIn(scope ?: CoroutineScope(coroutineContext))
val context = object : MultiConsumeBlock<T> {
override suspend fun subscribe(): Flow<T> {
val subscription = channel.openSubscription()
return flow { emitAll(subscription) }
}
}
try {
block(context)
} finally {
channel.cancel()
}
}
which I then use like this (think about the analogy to the file: flow a
gets every record, flow b
only the first 3 records (="file header") and flow c
everything after the header):
fun main() = runBlocking {
val src = testFlow()
src.multiConsume {
val a = subscribe().map { it }
val b = subscribe().drop(3).map{ it + it}
val c = subscribe().take(3).map{ it * it}
mapOf("A" to a, "B" to b, "C" to c).map { task -> launch { task.value.collect{ println("${task.key}: $it")} } }.toList().joinAll()
}
}
Output:
Starting loop
A: 0
C: 1
A: 1
C: 2
A: 4
C: 3
A: 9
C: 4
A: 16
C: 5
B: 10
C: 6
B: 12
C: 7
B: 14
C: 8
B: 16
C: 9
B: 18
C: 10
B: 20
C: 11
Loop has finished
Finally
Which looks good so far.
However, am I am unsure if I am using Kotlin's flows correctly in this regard.
Am I opening myself up for Deadlocks, missed Exceptions etc.?
The documentation just states:
All implementations of the Flow interface must adhere to two key properties described in detail below:
- Context preservation.
- Exception transparency.
But I am unsure if that's the case for my implementation or if I am missing something.
Or maybe there is a better way alltogether?