I am running an Akka Streams Kafka application and I want to incorporate the supervision strategy on the stream consumer such that if the broker goes down, and the stream consumer dies after a stop timeout, the supervisor can restart the consumer.
Here is my complete code:
UserEventStream:
import akka.actor.{Actor, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.pattern.ask
import akka.stream.ActorMaterializer
class UserEventStream extends Actor {
val settings = Settings(context.system).KafkaConsumers
implicit val timeout: Timeout = Timeout(10 seconds)
implicit val materializer = ActorMaterializer()
override def preStart(): Unit = {
super.preStart()
println("Starting UserEventStream....s")
}
override def receive = {
case "start" =>
val consumerConfig = settings.KafkaConsumerInfo
println(s"ConsumerConfig with $consumerConfig")
startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1))
}
def startStreamConsumer(config: Map[String, String]) = {
println(s"startStreamConsumer with config $config")
val consumerSource = createConsumerSource(config)
val consumerSink = createConsumerSink()
val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor")
println("START: The UserEventStream processing")
val future =
consumerSource
.mapAsync(parallelism = 50) { message =>
val m = s"${message.record.value()}"
messageProcessor ? m
}
.runWith(consumerSink)
future.onComplete {
case Failure(ex) =>
println("FAILURE : The UserEventStream processing, stopping the actor.")
self ! PoisonPill
case Success(ex) =>
}
}
def createConsumerSource(config: Map[String, String]) = {
val kafkaMBAddress = config("bootstrap-servers")
val groupID = config("groupId")
val topicSubscription = config("subscription-topic").split(',').toList
println(s"Subscriptiontopics $topicSubscription")
val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaMBAddress)
.withGroupId(groupID)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*))
}
def createConsumerSink() = {
Sink.foreach(println)
}
}
StreamProcessorSupervisor (this is the supervisor class of the UserEventStream class):
import akka.actor.{Actor, Props}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.ActorMaterializer
import stream.StreamProcessorSupervisor.StartClient
import scala.concurrent.duration._
object StreamProcessorSupervisor {
final case object StartSimulator
final case class StartClient(id: String)
def props(implicit materializer: ActorMaterializer) =
Props(classOf[StreamProcessorSupervisor], materializer)
}
class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
override def preStart(): Unit = {
self ! StartClient(self.path.name)
}
def receive: Receive = {
case StartClient(id) =>
println(s"startCLient with id $id")
val childProps = Props(classOf[UserEventStream])
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = "usereventstream",
minBackoff = 1.second,
maxBackoff = 1.minutes,
randomFactor = 0.2
)
)
context.actorOf(supervisor, name = s"$id-backoff-supervisor")
val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream")
userEventStrean ! "start"
}
}
App (the main application class):
import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer
object App extends App {
implicit val system = ActorSystem("stream-test")
implicit val materializer = ActorMaterializer()
system.actorOf(StreamProcessorSupervisor.props,"StreamProcessorSupervisor")
}
application.conf:
kafka {
consumer {
num-consumers = "1"
c1 {
bootstrap-servers = "localhost:9092"
bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1}
groupId = "localakkagroup1"
subscription-topic = "test"
subscription-topic = ${?SUBSCRIPTION_TOPIC1}
message-type = "UserEventMessage"
poll-interval = 50ms
poll-timeout = 50ms
stop-timeout = 30s
close-timeout = 20s
commit-timeout = 15s
wakeup-timeout = 10s
max-wakeups = 10
use-dispatcher = "akka.kafka.default-dispatcher"
kafka-clients {
enable.auto.commit = true
}
}
}
}
After running the application, I purposely killed the Kafka broker and then found that after 30 seconds, the actor is stopping itself by sending a poison pill. But strangely it doesn't restart as mentioned in the BackoffSupervisor strategy.
What could be the issue here?
There are two instances of
UserEventStreamin your code: one is the child actor that theBackoffSupervisorinternally creates with thePropsthat you pass to it, and the other is theval userEventStreanthat is a child ofStreamProcessorSupervisor. You're sending the"start"message to the latter, when you should be sending that message to the former.You don't need
val userEventStrean, because theBackoffSupervisorcreates the child actor. Messages sent to theBackoffSupervisorare forwarded to the child, so to send a"start"message to the child, send it to theBackoffSupervisor:The other issue is that when an actor receives a
PoisonPill, that's not the same thing as that actor throwing an exception. Therefore,Backoff.onFailurewon't be triggered whenUserEventStreamsends itself aPoisonPill. APoisonPillstops the actor, so useBackoff.onStopinstead: