Making two independent service calls from Monix Observable

16 Views Asked by At

Versions:

Scala Version: 2.12.9, monix/eval: 2.3.2, monix/reactive: 2.3.2

Backdrop:

I have two services serviceA and serviceB to call from a Monix observable. The program is as follows.

import monix.reactive.subjects.PublishToOneSubject
import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.Ack.Continue
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSS");
def timestamp = formatter.format(LocalDateTime.now)
def log = s"[$timestamp] [${Thread.currentThread.getName}]"

println(s"$log Program Starts.")

private def serviceA(num: Int) = Task {
  println(s"$log A: ${num}")
  Thread.sleep(1000)
  num * 10
}

private def serviceB(num: Int) = Task {
  println(s"$log B: ${num}")
  Thread.sleep(5 * 1000)
  num * 0.5
}

private val aCtx = Scheduler.fixedPool("aCtx", 2)
private val bCtx = Scheduler.fixedPool("bCtx", 2)

val stream: PublishToOneSubject[Int] = PublishToOneSubject[Int]

stream
  .doOnSubscribe( () => println(s"$log Stream is ready!"))
  .doOnNext {
    v => println(s"$log Event: ${v}")
  }
  .mapAsync(parallelism=3) {
    v =>
      serviceA(v)
        .zip(serviceB(v).executeOn(bCtx))
        .map(_._1)
  }
  .subscribe(
    nextFn = { v =>
        println(s"$log Result: ${v}")
        Continue
    }
  )(aCtx)

(1 to 10).foreach {
  i => stream.onNext(i)
}


Thread.sleep(30 * 1000)
println(s"$log Program Ends.")

Question:

How to ensure that any latency in calling serviceB doesn't impact the stream and it continues to process events with serviceA and return results?

Though I need to call serviceB, I am only interested in result of serviceA. I don't want any slowness in calling serviceB to impact ServiceA. Using zip delays the result if there is a latency in serviceB call.

0

There are 0 best solutions below