Cats Effect: Avoid calling unsafeRun on external listener of T => Unit

981 Views Asked by At

I am using external API which offers:

def listen(listener: ResponseType => Unit): Handle

My listener returns:

IO[Unit]

Is there a way to inject my listener without a need to call unsafeRun?

listen(msg => myListener(msg).unsafeRunSync())
1

There are 1 best solutions below

4
On

IO is not only container of your listener. From cats-effect documentation:

A pure abstraction representing the intention to perform a side effect, 
where the result of that side effect may be obtained synchronously (via return) 
or asynchronously (via callback)

it means IO contains not the some value but intention to perform some side-effect.

Here the most usual ways to work with IO:

  1. Add some effect to the containing of IO using flatMap (this way named composition):
val ioA: IO[A] = ??? // you have some side effect returning A
// here you can have some different side effect returning B: work with DB, read some resource or so on
def aToIOB(a: A): IO[B] = ??? 
val ioB: IO[B] = ioA.flatMap(aToIOB)
  1. Launching the whole chain of your combinations side effects from 1. using unsafeRunSync or similar functions which run IO. It should be called once in your application and as a rule in the end of the universe (at the end of your program).
// this line launches execution of ioA and aToIOB and will return value of B with side-effects
val b: B = ioB.unsafeRunSync() 

So when you see IO[A] you should remember that it is not the similar thing as Option[A] or List[A] and you can just execute IO to get A. You should use combination of IO while your application is not on the end of execution. If so you can run it using unsafeRunSync.

I mean you should build your application without using unsafeRunSync more than one time, and I guess, your signature

def listen(listener: ResponseType => Unit): Handle

should not use IO API, but very similar to Async[F[_]] API, take a look at function async:

/**
 * Creates a simple, non-cancelable `F[A]` instance that
 * executes an asynchronous process on evaluation.
 *
 * The given function is being injected with a side-effectful
 * callback for signaling the final result of an asynchronous
 * process.
 *
 * This operation could be derived from [[asyncF]], because:
 *
 * {{{
 *   F.async(k) <-> F.asyncF(cb => F.delay(k(cb)))
 * }}}
 *
 * As an example of wrapping an impure async API, here's the
 * implementation of [[Async.shift]]:
 *
 * {{{
 *   def shift[F[_]](ec: ExecutionContext)(implicit F: Async[F]): F[Unit] =
 *     F.async { cb =>
 *       // Scheduling an async boundary (logical thread fork)
 *       ec.execute(new Runnable {
 *         def run(): Unit = {
 *           // Signaling successful completion
 *           cb(Right(()))
 *         }
 *       })
 *     }
 * }}}
 *
 * @see [[asyncF]] for the variant that can suspend side effects
 *      in the provided registration function.
 *
 * @param k is a function that should be called with a
 *       callback for signaling the result once it is ready
 */
def async[A](k: (Either[Throwable, A] => Unit) => Unit): F[A]

Try to concentrate that listen should do in your application and choose correct signature.

Usefull links: