Apache Bahir, send stuff to ActorReceiver

115 Views Asked by At

I am trying to setup a simple process with Spark Streaming, using Apache Bahir to connect to Akka. I tried to follow their example together with this older one. I have a simple forwarder actor

class ForwarderActor extends ActorReceiver {
  def receive = {
    case data: MyData => store(data)
  }
}

and I create a stream with

val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName)

the configuration looks like this:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "localhost"
      port = 7777
    }
  }
}

and my problem is: how do I send messages to the Forwarder actor? Maybe I don't understand how Akka Remote is used in this case. When the app starts, I see a log

[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://test@localhost:7777]

and later on I see

[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:52369]

Which seems to remind to the description in the ScalaDoc:

 /**
   * A default ActorSystem creator. It will use a unique system name
   * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
   * communication.
   */

All in all I am not sure how I am supposed to send messages to the Forwarder actor. Thanks for any help!

1

There are 1 best solutions below

1
On

Akka actors can send messages to other Akka actors running on a remote JVM. So... when the sender actor needs to know the address of the intended receiver actor.

AkkaUtil (Bahir) enables you to create a spark-stream from the messages that a ReceiverActor receives. But, where is is going to receive messages from ? Well... some remote actor. And to send messages this remote actor is going to need the address of your ReceiverActor which is running in your spark-application.

In general, you can not be too sure about the ip which will be running your spark application. So, we will make it so that the actor running with spark will tell the producer actor its reference and request it to send its things.

Just make sure that both applications are written using same version of Scala and are running the same JRE.

Now... lets first write the actor who will be the data source,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class MyActor extends Actor with ActorLogging {

  val theListOfMyStrings = List("one", "two", "three")

  override def receive: Receive = {
    case SendMeYourStringsRequest(requesterRef) => {
      theListOfMyStrings.foreach(s => {
        requesterRef ! RequestedString(s)
      })
    }
  }
}

object MyApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="my-ip-address"
      |      port=18000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("my-actor-system", config)

  var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

Now... lets write our simple spark app,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class YourStringRequesterActor extends ActorReceiver {
  def receive = {
    case RequestedString(s) => store(s)
  }

  override def preStart(): Unit = {
    val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@my-ip-address:18000/user/my-actor")
    val myActorSelection = context.actorSelection(myActorPath)

    myActorSelection ! SendMeYourStringsRequest(self)
  }
}

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ActorWordCount")

    if (!sparkConf.contains("spark.master")) {
      sparkConf.setMaster("local[2]")
    }

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val stringStream = AkkaUtils.createStream[String](
        ssc,
        Props(classOf[YourStringRequesterActor]),
        "your-string-requester-actor"
    )

    stringStream.foreach(println)

  }
}

Note :: Just take care of my-ip-address. If there are any other problems, please let me know in comments.