Testing rx-observables from Futures/Iterables

179 Views Asked by At

I have:

val observable: Observable[Int] = Observable.from(List(5))

and I can test that the input list is indeed passed on to the observable by testing:

materializeValues(observable) should contain (5)

where materializeValues is:

def materializeValues[T](observable: Observable[T]): List[T] = {
  observable.toBlocking.toIterable.toList
}

Now, if I create an observable from a future, I can't seem to use materializeValues for the test as the test times out. So if I have:

val futVal = Future.successful(5)
val observable: Observable[Int] = Observable.from(futVal)
materializeValues(observable) should contain(5)

it times out and does not pass the test. What is different in the process of materializing these two observables, which leads to me not being able to block on it?

Also, what is the idomatic way of testing an observable? Is there any way of doing it without calling toBlocking?

1

There are 1 best solutions below

0
On BEST ANSWER

I think the problem is that you use AsyncWordSpecLike (by the way why AsyncWordSpecLike instead of AsyncWordSpec?). AsyncWordSpecLike/AsyncWordSpec are designed to simplify testing Future. Unfortunately Observable is a more powerful abstraction that can't be easily mapped onto a Future.

Particularly AsyncWordSpecLike/AsyncWordSpec allow your tests to return Future[Assertion]. To make it possible it provides custom implicit ExecutionContext that it can force to execute everything and know when all scheduled jobs have finished. However the same custom ExecutionContext is the reason why your second code doesn't work: processing of the scheduled jobs starts only after execution of your test code has finished but your code blocks on the futVal because unlucklily for you callback registered in Future.onComplete is scheduled to be run on the ExecutionContext. It means that you have a kind of dead-lock with your own thread.

I'm not sure what is the official way to test Observable on Scala. In Java I think TestSubscriber is the suggested tool. As I said Observable is fundamentally more powerful thing than Future so I think to test Observable you should avoid using AsyncWordSpecLike/AsyncWordSpec. If you switch to use FlatSpec or WordSpec, you can do something like this:

class MyObservableTestSpec extends WordSpec with Matchers {

  import scala.concurrent.ExecutionContext.Implicits.global
  val testValue = 5

  "observables" should {

    "be testable if created from futures" in {
      val futVal = Future.successful(testValue)
      val observable = Observable.from(futVal)

      val subscriber = TestSubscriber[Int]()
      observable(subscriber)
      subscriber.awaitTerminalEvent
      // now after awaitTerminalEvent you can use various subscriber.assertXyz methods
      subscriber.assertNoErrors
      subscriber.assertValues(testValue)
      // or you can use Matchers as 
      subscriber.getOnNextEvents should contain(testValue)
    }

  }

}