implementing mapAsync in scala

61 Views Asked by At

I'm trying to implement a mapAsync method for general sequences. Somehow it works when used independently, but it doesn't work when used inside a flatMap. Example code

import akka.actor.ActorSystem

import java.util.concurrent.Semaphore
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object Test extends App {
  implicit class SeqWrapper[+A, M[X] <: TraversableOnce[X]](underlying: M[A]) {
    def mapAsyncSequential[B](
      f: A ⇒ Future[B]
    )(implicit ec: ExecutionContext, cbf: CanBuildFrom[M[Future[B]], B, M[B]]): Future[M[B]] = {
      underlying
        .foldLeft(Future.successful(cbf())) { (fr, fa) ⇒
          for {
            r ← fr
            a ← f(fa)
          } yield r += a
        }
        .map(_.result())
    }

    def mapAsyncParallel[B](
      f: A ⇒ Future[B]
    )(implicit ec: ExecutionContext, cbf: CanBuildFrom[M[Future[B]], B, M[B]]): Future[M[B]] = {
      underlying
        .map(f)
        .foldLeft(Future.successful(cbf())) { (fr, fb) ⇒
          for {
            r ← fr
            b ← fb
          } yield (r += b)
        }
        .map(_.result())
    }

    def mapAsync[B](
      concurrentInstances: Int
    )(f: A ⇒ Future[B])(implicit ec: ExecutionContext, cbf: CanBuildFrom[M[Future[B]], B, M[B]]): Future[M[B]] = {
      val sem = new Semaphore(concurrentInstances)

      // It still spawns n threads. Just throttles the start of the actual operation.
      // It is based on semaphore, so threads are not busy-waiting.
      // Only the execution of the operation is delayed.
      // For ForkJoinPool execution-contexts it is fine, as it will spawn more threads when needed.
      // Using synchronized on the local variable, so that it's independent when used in multiple-threads.
      // Also, this is not super efficient, as it launches 3 threads for one element, but as said above,
      // it's fine for fork-join-pools.
      def throttled(current: A): Future[B] = Future {
        println("acquiring")
        sem.acquire()
        println("acquired")
      }.flatMap { _ ⇒
        println("operation")
        f(current).andThen {
          case _ ⇒
            println("release")
            sem.release()
        }
      }

      mapAsyncParallel(throttled)
    }
  }
  implicit class FutureValueProvider[T](f: Future[T]) {
    def futureValue: T = Await.result(f, Duration.Inf)
  }
  implicit val system: ActorSystem = ActorSystem("test")
  import system.dispatcher
  try {
    def f: Future[List[Int]] = {
      List(1, 2, 3, 4, 5).mapAsync(4) { scope =>
        Future.successful(scope)
      }
    }
    println(f.futureValue) // works
    println(Future.successful(1).flatMap(_ => f).futureValue) // gets stuck
  } finally system.terminate()
}

The first operation works with the following output

acquiring
acquiring
acquired
acquiring
acquired
operation
operation
acquiring
acquired
acquiring
acquired
operation
operation
release
release
acquired
release
release
operation
release
List(1, 2, 3, 4, 5)

But the second one gets stuck with the following output

acquiring
acquired
acquiring
acquired
acquiring
acquired
acquiring
acquired
acquiring

Why would that happen? What am I missing here?

0

There are 0 best solutions below