Problem When I restart/complete/STOP stream the old Consumer does not Die/Shutdown:
[INFO ] a.a.RepointableActorRef -
Message [akka.kafka.KafkaConsumerActor$Internal$Stop$]
from Actor[akka://ufo-sightings/deadLetters]
to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
was not delivered. [1] dead letters encountered.
Description I'm building a service that receives a message from Kafka topic and sends the message to an external service via HTTP request.
A connection with the external service can be broken, and my service needs to retry the request.
Additionally, if there is an error in the Stream, entire stream needs to restart.
Finally, sometimes I don't need the stream and its corresponding Kafka-consumer and I would like to shut down the entire stream
So I have a Stream:
Consumer.committableSource(customizedSettings, subscriptions)
.flatMapConcat(sourceFunction)
.toMat(Sink.ignore)
.run
Http request is sent in sourceFunction
I followed new Kafka Consumer Restart instructions in the new documentation
RestartSource.withBackoff(
minBackoff = 20.seconds,
maxBackoff = 5.minutes,
randomFactor = 0.2 ) { () =>
Consumer.committableSource(customizedSettings, subscriptions)
.watchTermination() {
case (consumerControl, streamComplete) =>
logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
streamComplete
.flatMap { _ =>
consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
}
.recoverWith {
case _ =>
consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
}
}
.flatMapConcat(sourceFunction)
}
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.left)
.run
There is an issue opened that discusses this non-terminating Consumer in a complex Akka-stream, but there is no solution yet.
Is there a workaround that forces the Kafka Consumer termination
How about wrapping the consumer in an Actor and registering a KillSwitch, see: https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling
Then in the Actor postStop method you can terminate the stream. By wrapping the Actor in a BackoffSupervisor, you get the exponential backoff.
Example actor: https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/KafkaSubscriberActor.scala#L27