How can I send items to a Kotlin.Flow (like a Behaviorsubject)

26.5k Views Asked by At

I wanted to know how can I send/emit items to a Kotlin.Flow, so my use case is:

In the consumer/ViewModel/Presenter I can subscribe with the collect function:

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}

But the issue is in the Repository side, with RxJava we could use a Behaviorsubject expose it as an Observable/Flowable and emit new items like this:

behaviourSubject.onNext(true)

But whenever I build a new flow:

flow {

}

I can only collect. How can I send values to a flow?

3

There are 3 best solutions below

9
On BEST ANSWER

If you want to get the latest value on subscription/collection you should use a ConflatedBroadcastChannel:

private val channel = ConflatedBroadcastChannel<Boolean>()

This will replicate BehaviourSubject, to expose the channel as a Flow:

// Repository
fun observe() {
  return channel.asFlow()
}

Now to send an event/value to that exposed Flow simple send to this channel.

// Repository
fun someLogicalOp() {
  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}

Console:

false

If you wish to only receive values after you start collecting you should use a BroadcastChannel instead.

To make it clear:

Behaves as an Rx's PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(true)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  channel.send(false)
}

false

Only false gets printed as the first event was sent before collect { }.


Behaves as an Rx's BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(true)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  confChannel.send(false)
}

true

false

Both events are printed, you always get the latest value (if present).

Also, want to mention Kotlin's team development on DataFlow (name pending):

Which seems better suited to this use case (as it will be a cold stream).

5
On

UPDATE:

Kotlin Coroutines 1.4.0 is now available with MutableSharedFlow, which replaces the need for Channel. MutableSharedFlow cleanup is also built in so you don't need to manually OPEN & CLOSE it, unlike Channel. Please use MutableSharedFlow if you need a Subject-like api for Flow

ORIGINAL ANSWER

Since your question had the android tag I'll add an Android implementation that allows you to easily create a BehaviorSubject or a PublishSubject that handles its own lifecycle.

This is relevant in Android because you don't want to forget to close the channel and leak memory. This implementation avoids the need to explicitly "dispose" of the reactive stream by tying it to the creation and destruction of the Fragment/Activity. Similar to LiveData

interface EventReceiver<Message> {
    val eventFlow: Flow<Message>
}

interface EventSender<Message> {
    fun postEvent(message: Message)
    val initialMessage: Message?
}

class LifecycleEventSender<Message>(
    lifecycle: Lifecycle,
    private val coroutineScope: CoroutineScope,
    private val channel: BroadcastChannel<Message>,
    override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {

    init {
        lifecycle.addObserver(this)
    }

    override fun postEvent(message: Message) {
        if (!channel.isClosedForSend) {
            coroutineScope.launch { channel.send(message) }
        } else {
            Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
    fun create() {
        channel.openSubscription()
        initialMessage?.let { postEvent(it) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun destroy() {
        channel.close()
    }
}

class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
    EventReceiver<Message> {
    override val eventFlow: Flow<Message> = channel.asFlow()
}

abstract class EventRelay<Message>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    channel: BroadcastChannel<Message>,
    initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
    EventSender<Message> by LifecycleEventSender<Message>(
        lifecycle,
        coroutineScope,
        channel,
        initialMessage
    )

By using the Lifecycle library from Android, I can now create a BehaviorSubject that cleans itself up after the activity/fragment has been destroyed

class BehaviorSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    ConflatedBroadcastChannel(),
    initialMessage
)

or I can create a PublishSubject by using a buffered BroadcastChannel

class PublishSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    BroadcastChannel(Channel.BUFFERED),
    initialMessage
)

And now I can do something like this

class MyActivity: Activity() {

    val behaviorSubject = BehaviorSubject(
        [email protected],
        [email protected]
    )

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        if (savedInstanceState == null) { 

            behaviorSubject.eventFlow
                .onEach { stringEvent ->
                    Log.d("BehaviorSubjectFlow", stringEvent)
                    // "BehaviorSubjectFlow: Initial Message"
                    // "BehaviorSubjectFlow: Next Message"
                }
                .flowOn(Dispatchers.Main)
                .launchIn([email protected])

        }
    }

    override fun onResume() {
        super.onResume()

        behaviorSubject.postEvent("Next Message")
    }
}
3
On

Take a look at MutableStateFlow documentation as it is a replacement for ConflatedBroadcastChannel that is going to be deprecated, very soon.

For a better context, look at the whole discussion on the original issue on Kotlin's repository on Github.