I'm trying to share a Ref[F, A]
between 2 concurrent streams. Below is a simplified example of the actual scenario.
class Container[F[_]](implicit F: Sync[F]) {
private val counter = Ref[F].of(0)
def incrementBy2 = counter.flatMap(c => c.update(i => i + 2))
def printCounter = counter.flatMap(c => c.get.flatMap(i => F.delay(println(i))))
}
In the main function,
object MyApp extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val s = for {
container <- Ref[IO].of(new Container[IO]())
} yield {
val incrementBy2 = Stream.repeatEval(
container.get
.flatTap(c => c.incrementBy2)
.flatMap(c => container.update(_ => c))
)
.metered(2.second)
.interruptScope
val printStream = Stream.repeatEval(
container.get
.flatMap(_.printCounter)
)
.metered(1.seconds)
incrementBy2.concurrently(printStream)
}
Stream.eval(s)
.flatten
.compile
.drain
.as(ExitCode.Success)
}
}
The updates made by the incrementBy2
are not visible in printStream
.
How can I fix this?
I would appreciate any help to understand the mistake in this code.
Thanks
Your code is broken since the class definition, you are not even updating the same
Ref
Remember that the point of
IO
is to be a description of a computation, soRef[F].of(0)
returns a program that when evaluated will create a newRef
, calling multipleflatMaps
on it will result in multipleRefs
being created.Also, your is not doing tagless final in the right way (and some may argue that even the right way is not worth it: https://alexn.org/blog/2022/04/18/scala-oop-design-sample/)
This is how I would write your code:
You can see the code running here.