Gracefully restart a Reactive-Kafka Consumer Stream on failure

1.5k Views Asked by At

Problem When I restart/complete/STOP stream the old Consumer does not Die/Shutdown:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.

Description I'm building a service that receives a message from Kafka topic and sends the message to an external service via HTTP request.

  1. A connection with the external service can be broken, and my service needs to retry the request.

  2. Additionally, if there is an error in the Stream, entire stream needs to restart.

  3. Finally, sometimes I don't need the stream and its corresponding Kafka-consumer and I would like to shut down the entire stream

So I have a Stream:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run

Http request is sent in sourceFunction

I followed new Kafka Consumer Restart instructions in the new documentation

  RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 ) { () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() {
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
                  streamComplete
                    .flatMap { _ =>
                      consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
                    }
                    .recoverWith {
                      case _ =>
                        consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
                    }
             }
            .flatMapConcat(sourceFunction)
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.left)
      .run

There is an issue opened that discusses this non-terminating Consumer in a complex Akka-stream, but there is no solution yet.

Is there a workaround that forces the Kafka Consumer termination

1

There are 1 best solutions below

1
On

How about wrapping the consumer in an Actor and registering a KillSwitch, see: https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling

Then in the Actor postStop method you can terminate the stream. By wrapping the Actor in a BackoffSupervisor, you get the exponential backoff.

Example actor: https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/KafkaSubscriberActor.scala#L27