Understanding observer in monix

466 Views Asked by At

I'm reading Monix documentation about observers ad I faced with the following example:

Or you can quickly build an Observer that only logs the events that it receives. We’ll use this in other samples:

import monix.reactive.Observer

val out = Observer.dump("O")
// out: Observer.Sync[Any]

out.onNext(1)
//=> 0: O-->1
// res0: Ack = Continue

out.onNext(2)
//=> 1: O-->2
// res0: Ack = Continue

out.onComplete()
//=> 2: O completed

Yet the next ILLEGAL example:

Feeding two elements, then stopping. This is NOT legal:

// BAD SAMPLE
observer.onNext(1)
observer.onNext(2)
observer.onComplete()

So we can see the same onNext -> onNext -> onComplete chain. Is it not legal? Why?

1

There are 1 best solutions below

3
On BEST ANSWER

In the documentation that you've linked, it's explained directly after the example

This is the legal way of doing it:

observer.onNext(1).map {
  case Continue =>
    // We have permission to continue
    observer.onNext(2)
    // No back-pressure required here
    observer.onComplete()
    Stop
  case Stop =>
    // Nothing else to do
    Stop
}

As you can see in the comments, the issue is back-pressure. So why is there an example, using .dump that seems to be illegal?

Note the comments in that example:

//=> 0: O-->1
// res0: Ack = Continue

These comments are showing what you would get if you ran this in the Scala REPL. When you enter an expression and hit return, the REPL prints something like res0 and lets you know what the return value of the last command was.

So this example is demonstrating:

  • Feeding an Observer from the REPL
  • That each .onNext has completed with Continue

It wouldn't be correct to write a program that feeds an Observer in this way, but this is a correct transcription of a legal execution of feeding an observer.

You can see the rules relating to back-pressure under the Contract section:

  1. Back-pressure: each onNext call MUST wait on a Continue result returned by the Future[Ack] of the previous onNext call.
  2. Back-pressure for onComplete and onError is optional: when calling onComplete or onError you are not required to wait on the Future[Ack] of the previous onNext.

This is a good point to dwell on as elegant back-pressure management is one of the big promises of reactive streams.