akka: pattern for combining messages from multiple children

3.9k Views Asked by At

Here's the pattern I have come across:

An actor A has multiple children C1, ..., Cn. On receiving a message, A sends it to each of its children, which each do some calculation on the message, and on completion send it back to A. A would then like to combine the results of all the children to pass onto another actor.

What would a solution for this problem look like? Or is this an anti-pattern? In which case how should this problem be approached?

Here is a trivial example which hopefully illustrates my current solution. My concerns are that is duplicates code (up to symmetry); does not extend very well to 'lots' of children; and makes it quite hard to see what's going on.

import akka.actor.{Props, Actor}

case class Tagged[T](value: T, id: Int)

class A extends Actor {
  import C1._
  import C2._

  val c1 = context.actorOf(Props[C1], "C1")
  val c2 = context.actorOf(Props[C2], "C2")
  var uid = 0
  var c1Results = Map[Int, Int]()
  var c2Results = Map[Int, Int]()

  def receive = {
    case n: Int => {
      c1 ! Tagged(n, uid)
      c2 ! Tagged(n, uid)
      uid += 1
    }
    case Tagged(C1Result(n), id) => c2Results get id match {
      case None => c1Results += (id -> n)
      case Some(m) => {
        c2Results -= id
        context.parent ! (n, m)
      }
    }
    case Tagged(C2Result(n), id) => c1Results get id match {
      case None => c2Results += (id -> n)
      case Some(m) => {
        c1Results -= id
        context.parent ! (m, n)
      }
    }
  }
}

class C1 extends Actor {
  import C1._

  def receive = {
    case Tagged(n: Int, id) => Tagged(C1Result(n), id)
  }
}

object C1 {
  case class C1Result(n: Int)
}

class C2 extends Actor {
  import C2._

  def receive = {
    case Tagged(n: Int, id) => Tagged(C2Result(n), id)
  }
}    

object C2 {
  case class C2Result(n: Int)
}

If you think the code looks god-awful, take it easy on me, I've just started learning akka ;)

2

There are 2 best solutions below

3
On BEST ANSWER

In the case of many - or a varying number of - child actors, the ask pattern suggested by Zim-Zam will quickly get out of hand.

The aggregator pattern is designed to help with this kind of situation. It provides an Aggregator trait that you can use in an actor to perform your aggregation logic.

A client actor wanting to perform an aggregation can start an Aggregator based actor instance and send it a message that will kick off the aggregation process.

A new aggregator should be created for each aggregation operation and terminate on sending back the result (when it has received all responses or on a timeout).

An example of this pattern to sum integer values held by the actors represented by the Child class is listed below. (Note that there is no need for them to all be children supervised by the same parent actor: the SummationAggregator just needs a collection of ActorRefs.)

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import akka.actor._
import akka.contrib.pattern.Aggregator

object Child {
  def props(value: Int): Props = Props(new Child(value))

  case object GetValue
  case class GetValueResult(value: Int)
}

class Child(value: Int) extends Actor {
  import Child._

  def receive = { case GetValue => sender ! GetValueResult(value) }
}

object SummationAggregator {
  def props = Props(new SummationAggregator)

  case object TimedOut
  case class StartAggregation(targets: Seq[ActorRef])
  case object BadCommand
  case class AggregationResult(sum: Int)
}

class SummationAggregator extends Actor with Aggregator {
  import Child._
  import SummationAggregator._

  expectOnce {
    case StartAggregation(targets) =>
      // Could do what this handler does in line but handing off to a 
      // separate class encapsulates the state a little more cleanly
      new Handler(targets, sender())
    case _ =>
      sender ! BadCommand
      context stop self
  }

  class Handler(targets: Seq[ActorRef], originalSender: ActorRef) {
    // Could just store a running total and keep track of the number of responses 
    // that we are awaiting...
    var valueResults = Set.empty[GetValueResult]

    context.system.scheduler.scheduleOnce(1.second, self, TimedOut)

    expect {
      case TimedOut =>
        // It might make sense to respond with what we have so far if some responses are still awaited...
        respondIfDone(respondAnyway = true)
    }

    if (targets.isEmpty)
      respondIfDone()
    else
      targets.foreach { t =>
        t ! GetValue
        expectOnce {
          case vr: GetValueResult =>
            valueResults += vr
            respondIfDone()
        }
      }

    def respondIfDone(respondAnyway: Boolean = false) = {
      if (respondAnyway || valueResults.size == targets.size) {
        originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc, GetValueResult(v)) => acc + v })
        context stop self
      }
    }
  }
}

To use this SummationAggregator from your parent actor you could do:

context.actorOf(SummationAggregator.props) ! StartAggregation(children)

and then handle AggregationResult somewhere in the parent's receive.

1
On

You can use ? instead of ! on the child actors - this will cause the child actors to return a Future with their (eventual) results, i.e. everything is still non-blocking up until you Await the outcome of the Future. The parent actor can then compose these Futures and send it on to another actor - it will already know each Future's identity and so you won't need to worry about tagging each message so that you can put them back in order later. Here's a simple example where each child returns a random Double, and you want to divide the first child's return value by the second child's return value (i.e. order matters).

import scala.concurrent.duration._

import akka.actor.{Props, Actor}
import akka.pattern.{ask, pipe}
import akka.util.Timeout

class A extends Actor {
  val c1 = context.actorOf(Props[C], "C1")
  val c2 = context.actorOf(Props[C], "C2")

  // The ask operation involves creating an internal actor for handling 
  // this reply, which needs to have a timeout after which it is
  // destroyed in order not to leak resources; see more below.
  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case _ => {
      val f1 = c1 ? "anything" // Future[Any]
      val f2 = c2 ? "anything" // Future[Any]
      val result: Future[Double] = for {
        d1 <- f1.mapTo[Double]
        d2 <- f2.mapTo[Double]
      } yield d1 / d2
  }
}

class C extends Actor {
  def receive = {
    case _ => // random Double
  }
}