Understanding monix consumer load balance

123 Views Asked by At

I am learning monix 3.
The next code:

object Main extends TaskApp {
  override def runc = {
    Observable.fromIterable(1 to 10)
      .map{i =>
        val delay = Random.nextInt(1000) + 1000
        println(s"Starting $i, delay = $delay")
        Thread.sleep(delay)     // Imitation of hard execution
        i
      }
      .map{i =>
        val delay = Random.nextInt(1000) + 1000
        println(s"Continue $i, delay = $delay")
        Thread.sleep(delay)
        i
      }
      .consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))   //Compile error here
  }
}

leads to the compile error:

missing parameter type
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))

I can't figure out, what's wrong here, and how to make this code compile?

UPD
The second question is how to repeat this stream every n minutes?

1

There are 1 best solutions below

0
On BEST ANSWER

As answer to your first question, you have to explicitly give a type parameter to foreach:

Consumer.foreach[Int](i => println(s"End $i"))

To answer your second question, use Observable.intervalAtFixedRate or Observable.intervalAtFixedDelay.

Please refer to the Monix Scaladoc.

I hope this helps.