I have my code below
interface Listener {
fun onGetData(data: Int)
fun onClose()
}
class MyEmitter {
var listener: Listener? = null
fun sendData(data: Int) = listener?.onGetData(data)
fun close() = listener?.onClose()
}
fun handleInput(myEmitter: MyEmitter) = channelFlow {
myEmitter.listener = object:Listener {
override fun onGetData(data: Int) { trySend(data) }
override fun onClose() { close() }
}
}
fun main(): Unit = runBlocking {
val myEmitter = MyEmitter()
handleInput(myEmitter).collect {
println(it)
}
myEmitter.sendData(1)
myEmitter.sendData(2)
myEmitter.close()
}
Whenever I send the data e.g. myEmitter.sendData(1), it does get into trySend(data), but the result is closed.
Why is it closed? How can I keep it open?
I think it's not documented terribly clearly, but just like the
flowbuilder, thechannelFlow's Flow is considered complete once the suspend lambda returns. Since all you are doing is setting a listener and not waiting around, it will return almost immediately. When a channel Flow is completed, it's channel is also closed.If you want your
channelFlowto stay open until the Flow is canceled, callawaitClose()at the end. This function suspends until the channel is closed, so it will hold your Flow open until it's canceled or the event in your listener closes the Channel.If you are familiar with
callbackFlow, it is a specialized version ofchannelFlowand it enforces theawaitClose()call because it is meant for waiting for a listener, so there's no reason you would ever not want to await. It's also where you can deregister any listener you created inside the flow builder.