There is the sourceFlow
and I use flatMapLatest
to produce new flow by value from sourceFlow
.
And there is no value in collect()
. This code prints nothing.
fun main() {
fun createNewFlow(): MutableSharedFlow<String> = MutableSharedFlow()
runBlocking {
val sourceFlow = MutableSharedFlow<Int>()
// flow for transform
val set = mapOf(
1 to createNewFlow(),
2 to createNewFlow(),
3 to createNewFlow()
)
launch {
// get flow from set by number from sourceFlow
val res = sourceFlow.flatMapLatest { set[it]!! }
res.collect {
println(it)
}
}
launch {
sourceFlow.emit(1)
set[1]!!.emit("1")
set[1]!!.emit("1")
sourceFlow.emit(2)
set[2]!!.emit("2")
set[2]!!.emit("2")
}
}
}
If I change flatMapLatest
to flatMapConcat
I see 1
twice, it is obviously because flow from set[1] is not completed, but I'm waiting for 1, 1, 2, 2, 3, 3
when I use flatMapLatest
. What is wrong with my sample?
UPD:
I just added delay to get result of println
.
So, this is really a race in the code
launch {
delay(3000)
sourceFlow.emit(1)
set[1]!!.emit("1")
set[1]!!.emit("1")
}
As @louis-wasserman noted in the comments, it is a race (though not a thread-unsafe data race).
flatMapLatest
internallycollect
s (and cancels) the sub-flows. By default, MutableSharedFlow does not replay elements before the subscriptions.So there is a race between the two coroutines. If the first coroutine becomes a subscriber after the second coroutine calls
emit
, no value will be received. You can specify thereplay
parameter to make sure thecollect
calls receive elements before the subscription.