I have begun working with TypedActors in Scala and have run into a problem doing something pretty simple: I want Actor A to call a method on Actor B and process the result within an anonymous function on Actor A, but ensuring that:
- My response-handling function is thread-safe, e.g. will not run concurrently with any other threads accessing Actor A’s state
- My response-handling function can reference the context of Actor A
How can I (or can I) satisfy both of those requirements?
For example, this actor just wants to call an API on otherActor that returns a Future[Int], and update it's state with the result and then do something that requires it's actor context:
class MyActorImpl extends MyActor {
// my mutable state
var myNumber = 0
// method proxied by TypedActor ref:
def doStuff(otherActor: OtherActor): Unit = {
otherActor.doOtherStuff onSuccess {
// oops this is no longer running in MyActorImpl..
// this could be on a concurrent thread if we
case i => processResult(i)
}
}
private def processResult(i: Int): Unit = {
myNumber = 0 // oops, now we are possibly making a concurrent modification
println(s"Got $i")
// fails with java.lang.IllegalStateException: Calling TypedActor.context
// outside of a TypedActor implementation method!
println(s"My context is ${TypedActor.context}")
}
}
What am I missing here? Do I need to write my handler to call a method defined on the proxied interface to guarantee single-entry? That would seem ugly if I do not want to expose that particular “private” method (e.g. processResult) on an interface.
Here is a full version that will run in the Scala REPL:
import akka.actor._
import scala.concurrent._
val system = ActorSystem("mySystem")
import system.dispatcher
trait OtherActor {
def doOtherStuff(): Future[Int]
}
trait MyActor {
def doStuff(otherActor: OtherActor): Unit
}
class OtherActorImpl extends OtherActor {
var i = 0
def doOtherStuff(): Future[Int] = {
i += 1
Future {i}
}
}
class MyActorImpl extends MyActor {
// my mutable state
var myNumber = 0
// method proxied by TypedActor ref:
def doStuff(otherActor: OtherActor): Unit = {
otherActor.doOtherStuff onSuccess {
// oops this is no longer running in MyActorImpl..
// this could be on a concurrent thread if we
case i => processResult(i)
}
}
private def processResult(i: Int): Unit = {
myNumber = 0 // oops, now we are possibly making a concurrent modification
println(s"Got $i")
// fails with java.lang.IllegalStateException: Calling TypedActor.context
// outside of a TypedActor implementation method!
println(s"My context is ${TypedActor.context}")
}
}
val actor1: MyActor = TypedActor(system).typedActorOf(TypedProps[MyActorImpl])
val actor2: OtherActor = TypedActor(system).typedActorOf(TypedProps[OtherActorImpl])
actor1.doStuff(actor2)
You are exposing state of the Actor to outside world and that's a very bad thing. Look here: http://doc.akka.io/docs/akka/2.3.3/general/jmm.html section Actors and shared mutable state lines 9-10 describe your case.
@philwalk already described how you could fix this problem: Akka TypedActor - how to correctly handle async responses with context