Why are Futures within Futures running sequentially when started on Akka Dispatcher

812 Views Asked by At

We observed a strange behavior when we tried to start a number of futures from within an actor's receive method. If we use our configured dispatchers as ExecutionContext, the futures run on the same thread and sequentially. If we use ExecutionContext.Implicits.global, the futures run in parallel as expected.

We boiled down the code to the following example (a more complete example is below):

implicit val ec = context.getDispatcher

Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }

Future {
   Future{ doWork() } 
   Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
   Future{ doWork() }
   Future{ doWork() }
}

A compilable example would be like this:

import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")   

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(startFuture)
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }


}

We tried with both, thread-pool-executor and fork-join-executor, with the same result.

Are we using futures in the wrong way? How should you then spawn parallel tasks?

3

There are 3 best solutions below

1
On BEST ANSWER

From the description of Akka's internal BatchingExecutor (emphasis mine):

Mixin trait for an Executor which groups multiple nested Runnable.run() calls into a single Runnable passed to the original Executor. This can be a useful optimization because it bypasses the original context's task queue and keeps related (nested) code on a single thread which may improve CPU affinity. However, if tasks passed to the Executor are blocking or expensive, this optimization can prevent work-stealing and make performance worse....A batching executor can create deadlocks if code does not use scala.concurrent.blocking when it should, because tasks created within other tasks will block on the outer task completing.

If you're using a dispatcher that mixes in BatchingExecutor--namely, a subclass of MessageDispatcher--you could use the scala.concurrent.blocking construct to enable parallelism with nested Futures:

Future {
  Future {
    blocking {
      doBlockingWork()
    }
  }
}

In your example, you would add blocking in the startFuture method:

private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
  blocking {
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

Sample output from running startFutures(true)(actorSystem.dispatcher) with the above change:

Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
1
On

It has to do with "throughput" setting for dispatcher. I added a "fair-dispatcher" to application.conf to demonstrate this:

fair-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 1
}

Here is your example with a few modifications to use fair dispatcher for Futures and print the current value of throughput setting:

package com.test

import akka.actor.ActorSystem

import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")

  println("Default dispatcher throughput:")
  println(actorSystem.dispatchers.defaultDispatcherConfig.getInt("throughput"))

  println("Fair dispatcher throughput:")
  println(actorSystem.dispatchers.lookup("fair-dispatcher").configurator.config.getInt("throughput"))

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        implicit val fairExecutionContext = actorSystem.dispatchers.lookup("fair-dispatcher")
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(i => startFuture(i)(fairExecutionContext))
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

Output:

Default dispatcher throughput:
5
Fair dispatcher throughput:
1
Future 12 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 11 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 13 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 14 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 16 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 15 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 17 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 18 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 19 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 13 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 11 finished on thread Experimental-akka.actor.default-dispatcher-4
Future 12 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 14 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 16 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 15 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 17 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 18 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 19 finished on thread Experimental-akka.actor.default-dispatcher-10
Start Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 1 should run for 500 millis on thread Experimental-fair-dispatcher-12
Future 2 should run for 500 millis on thread Experimental-fair-dispatcher-13
Future 4 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 3 should run for 500 millis on thread Experimental-fair-dispatcher-14
Future 5 should run for 500 millis on thread Experimental-fair-dispatcher-17
Future 6 should run for 500 millis on thread Experimental-fair-dispatcher-16
Future 7 should run for 500 millis on thread Experimental-fair-dispatcher-18
Future 8 should run for 500 millis on thread Experimental-fair-dispatcher-19
Started Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 4 finished on thread Experimental-fair-dispatcher-15
Future 2 finished on thread Experimental-fair-dispatcher-13
Future 1 finished on thread Experimental-fair-dispatcher-12
Future 9 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 5 finished on thread Experimental-fair-dispatcher-17
Future 7 finished on thread Experimental-fair-dispatcher-18
Future 8 finished on thread Experimental-fair-dispatcher-19
Future 6 finished on thread Experimental-fair-dispatcher-16
Future 3 finished on thread Experimental-fair-dispatcher-14
Future 9 finished on thread Experimental-fair-dispatcher-15

As you can see, fair-dispatcher uses different threads for most of the futures.

Default dispatcher is optimized for actors so the throughput is set to 5 to minimize context switches to improve message processing throughput while maintaining some degree of fairness.

The only change in my fair-dispatcher is throughput: 1, i.e. each async execution request is given its own thread if possible (up to parallelism-max).

I'd recommend to create separate dispatchers for futures used for different purposes. E.g. one dispatcher (i.e. thread pool) for calling some web services, another one for blocking DB access etc. This would give you more precise control over it by tweaking custom dispatcher settings.

Take a look at https://doc.akka.io/docs/akka/current/dispatchers.html, it is really useful for understanding the details.

Also check out the Akka reference settings (default-dispatcher in particular), there are a bunch of useful comments over there: https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf

0
On

After some research I found out that the Dispatcher class implememts akka.dispatch.BatchingExecutor. For performance reasons, this class checks which tasks should be batched on the same thread. Future.map internally creates a scala.concurrent.OnCompleteRunnable which is batched in the BatchingExecutor.

This seems to be reasonable for map() / flatMap() where one task generates one subsequent task, but not for explicit new Futures which are used to fork work. Internally, Future.apply is implemented by Future.successful().map and is thus batched. My workaround is now to create futures in a different way:

object MyFuture {
  def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
    val promise = Promise[T]()
    class FuturesStarter extends Runnable {
      override def run(): Unit = {
        promise.complete(Try(body))
      }
    }
    executor.execute(new FuturesStarter)
    promise.future
  }
}

The FutureStarter-Runnables are not batched and thus run in parallel.

Can anybody confirm that this solution is okay? Are there better ways to resolve this issue? Is the current implementation of Future / BatchingExecutor wanted, or is it a bug?