Processing async results with Arrow-kt and Kotlin

1.1k Views Asked by At

I have two async function calls to external systems returning Either<Exception, Something> and need to combine their results. Being a beginner in functional programming in Arrow-Kt, I am wondering which is the best way of accomplishing this task. Below is the code, which I am currently using. It certainly works but does not really "feel" to be most straight forward. I am looking for a more "functional" style to get the result. NB: the upfront usage of the successful List result is necessary.

suspend fun getAs(): Either<Exception, List<A>> = TODO()
suspend fun getBs(): Either<Exception, List<B>> = TODO()
suspend fun doSomethingWithA(listA: List<A>): Unit = TODO()

launch {
    val deferredA = async { getAs() }
    val deferredB = async { getBs() }

    either<Exception, List<A>> {
        val listOfAs = deferredA.await()
            .bimap(leftOperation = { e ->
                println("special message on error for A")
                e
            }, rightOperation = { listA ->
                doSomethingWithA(listA)
                listA
            })
            .bind()
        val listOfBs = deferredB.await().bind()

        listOfAs.filter { it.someId !in listOfBs.map { it.someProperty } }
    }
    .map { /* handle result */ }
    .handleError { /* handle error */ }

}

An alternative option would be to just use the map{} function like so

launch {
    val deferredA = async { getAs() }
    val deferredB = async { getBs() }

    deferredA.await()
        .bimap(leftOperation = { e ->
            println("special message on error for A")
            e
        }, rightOperation = { listA ->
            doSomethingWithA(listA)
            deferredB.await().map { listB ->
                listA.filter { a -> a.someId !in listB.map { it.someProperty } }
            }
        })
        .map { /* handle result */ }
        .handleError { /* handle error */ }
}
2

There are 2 best solutions below

0
On BEST ANSWER

The easiest way is to combine either { } with parZip. either { } allows you to extract A from Either<E, A>, and parZip is a utility function for running suspend function in parallel.

suspend fun getAs(): Either<Exception, List<A>> = TODO()
suspend fun getBs(): Either<Exception, List<B>> = TODO()
suspend fun doSomethingWithA(listA: List<A>): Unit = TODO()

either {
  val list = parZip(
    {
       getAs()
         .mapLeft { e -> println("special message on error for A"); e }
         .bind()
    },
    { getBs().bind() },
    { aas, bbs ->
      aas.filter { a -> a.someId !in bbs.map { it.someProperty }
    }
  )

  /* Work with list and return value to `either { } */
}.handleError { /* handle error */ }

Here bind() extracts A from Either<E, A>. We do this inside parZip such that whenever a Left gets encountered it short-circuits the either { } block and by doing so, it also cancels the still running tasks in parZip.

Such that if getAs() returns immediately with Left then it becomes the output value of either { } and getBs() gets cancelled.

1
On

I was about to post a very similar answer. Note that getAs and getBs are not really sequential, since getBs does not need the result of getAs to be executed. They only happen to need to combine results in the end. In other words: we can parallelize

Here you have some additional things I'd do on top of what Simon suggested. (I'll replace A's and B's in this example by NetworkUser and DbUser to try to give it some semantics, since otherwise those "id" properties on the filter will not work.

Capture errors and map them to strongly type domain errors on each of those effectful functions.

This will help to both remove burden from the rest of the program, and on top of it provide a safer domain error hierarchy that we can perform exhaustive evaluation on when required.

suspend fun <A> getUsersFromNetwork(): Either<DomainError, List<NetworkUser>> =
 Either.catch { fetchUsers() }
   .mapLeft { exception ->
     println("special message on error for A")
     exception.toDomain()
   }

Make the doSomething function return Either in case it can also fail.

This is a function you said it's required right after the initial get, which means flatMap or bind (they're equivalent). If we lift it into Either that'll ensure error short-circuiting takes place as expected, so this operation never runs in the initial one didn't succeed.

I suggest doing this because I suspect this operation you have here is also an effect in your code as a consequence of the first operation, probably something to store the result of the first operation in a local cache or another type of effect that is simply consuming that result.

suspend fun doSomethingWithNetworkUsers(listA: List<NetworkUser>): Either<DomainError, Unit> = TODO()

So our functions we'll rely on from the composed one can look like this:

suspend fun getUsersFromNetwork(): Either<DomainError, List<NetworkUser>> = TODO()
suspend fun getUsersFromDb(): Either<DomainError, List<DbUser>> = TODO()
suspend fun doSomethingWithNetworkUsers(listA: List<NetworkUser>): Either<DomainError, Unit> = TODO()

And the program:

fun CoroutineScope.program() {
  launch {
    either {
      parZip(
        {
          val networkUsers = getUsersFromNetwork().bind()
          doSomethingWithNetworkUsers(networkUsers).bind()
          networkUsers
        },
        { getUsersFromDb().bind() }
      ) { networkUsers, dbUsers ->
        networkUsers.filter { networkUser ->
          networkUser.id !in dbUsers.map { dbUser -> dbUser.id }
        }
      }
    }
    .map { /* do something with the overall result */ }
    .handleError { /* can recover from errors here */ }
    // Alternatively:
    // .fold(ifLeft = {}, ifRight = {}) for handling both sides.
  }
}

By doing the first operation as a composed one with binds first, like in the following snippet extracted from the one above, we make sure both operations are done before the parZip lambda to combine results takes place.

val networkUsers = getUsersFromNetwork().bind()
doSomethingWithNetworkUsers(networkUsers).bind()
networkUsers