RxScala Observables with replay

80 Views Asked by At

I'm trying to understand replay in RxScala. I create an observable like this:

lazy val toyObservable : Observable[Int] = {
    val coldObservable : Observable[Int] = intPerSecond
    val hotObservable : ConnectableObservable[Int] = coldObservable.publish
    val cachedObservable = hotObservable //.replay(3)   //<<<<<<<<< ODD THING 
    cachedObservable.connect
    cachedObservable
}

where intPerSecond shoots out one integer per second, starting at 0. The first observer to subscribe indeed sees one integer per second. If the second observer joins in at t=6 seconds, then from that point they both see a matching stream 6...7...8...9... at one second intervals. That's as expected.

Now if I add in the .replay(3) I'd expect that when the second observer joins, he'd see 3456...7...8...9, ie he'd immediately get 3 integers from the cache, then receive them at one per second as they were produced. But instead, neither observer now sees anything. Do I have the syntax wrong?

1

There are 1 best solutions below

1
On BEST ANSWER

You forget to call hotObservable.connect. The following codes output exactly what you want:

import rx.lang.scala._
import rx.lang.scala.observables._
import scala.concurrent.duration._
val intPerSecond = Observable.interval(1.seconds).map(_.toInt)
val coldObservable : Observable[Int] = intPerSecond
val hotObservable : ConnectableObservable[Int] = coldObservable.publish
val cachedObservable = hotObservable.replay(3)
cachedObservable.connect
hotObservable.connect
cachedObservable.foreach(i => println(s"1: $i"))
Thread.sleep(6000)
cachedObservable.foreach(i => println(s"2: $i"))