I am learning monix 3
.
The next code:
object Main extends TaskApp {
override def runc = {
Observable.fromIterable(1 to 10)
.map{i =>
val delay = Random.nextInt(1000) + 1000
println(s"Starting $i, delay = $delay")
Thread.sleep(delay) // Imitation of hard execution
i
}
.map{i =>
val delay = Random.nextInt(1000) + 1000
println(s"Continue $i, delay = $delay")
Thread.sleep(delay)
i
}
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i")))) //Compile error here
}
}
leads to the compile error:
missing parameter type
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))
I can't figure out, what's wrong here, and how to make this code compile?
UPD
The second question is how to repeat this stream every n
minutes?
As answer to your first question, you have to explicitly give a type parameter to
foreach
:To answer your second question, use
Observable.intervalAtFixedRate
orObservable.intervalAtFixedDelay
.Please refer to the Monix Scaladoc.
I hope this helps.