What is a good pattern for committing Kafka consumer offset after processing message?

1.8k Views Asked by At

I am using Akka Streams Kafka to pipe Kafka messages to a remote service. I want to guarantee that the service receives every message exactly once (at-least-once AND at-most-once delivery).

Here's the code I came up with:

  private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
                           topicPattern: String,
                           mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {

    val groupId = config.getString("group-id")

    implicit val materializer = ActorMaterializer()

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
    import system.dispatcher // the ExecutionContext that will be used in ask call below

    Consumer.committableSource(consumerSettings, Subscriptions
      .topicPattern(topicPattern))
      .map(message => (message, mapCommittableMessageToSinkMessage(message)))
      .mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
      .mapAsync(1)(message => message.committableOffset.commitScaladsl())
      .runWith(Sink.ignore)
  }

As the code shows, it maps tuples of the original message, as well as the transformed messages passed to the subscriber (an actor that sends to remote service). The purpose of the tuple is to commit the offset after the subscriber completes processing.

Something about it just seems like an anti-pattern, but I'm not sure a better way to do it. Any suggestions on a better way?

Thanks!

2

There are 2 best solutions below

1
On BEST ANSWER

One way to keep this a cleaner and easier to change could be by using the GraphDSL. It would allow you to spawn a branch of you graph carrying over the Committable part of your message, whilst another branch can perform all the needed business logic.

An example of graph could be (omitting all the boilerplate for better clarity):

val src = Consumer.committableSource(consumerSettings, Subscriptions
      .topicPattern(topicPattern))

val businessLogic = Flow[CommittableMessage[String, String]].mapAsync(1)(message => ask(subscriber, mapCommittableMessageToSinkMessage(message)))

val snk = Flow[CommittableMessage[String, String]].mapAsync(1)(message => message.committableOffset.commitScaladsl())
      .runWith(Sink.ignore)  // look into Sink.foldAsync for a more compact re-write of this part

src ~> broadcast
       broadcast ~> businessLogic ~> zip.in0
       broadcast         ~>          zip.in1
                                     zip.out.map(_._2) ~> snk
0
On

Here is the full code that worked using @stefano-bonetti approach in answer above:

  private def startStream[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
                             topicSuffix: String,
                             convertCommittableMessageToSubscriberMessage: Function[CommittableMessage[String, String], T]) {

    val groupId = config.getString("group-id")
    val subscriberName = subscriber.path.name
    val customerId = config.getString("customer-id")
    val topicPattern = s"^$customerId\\.$topicSuffix$$"

    implicit val materializer = ActorMaterializer()

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withGroupId(s"$groupId.$subscriberName")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
    import system.dispatcher // the ExecutionContext that will be used in ask call below

    val src = Consumer.committableSource(consumerSettings, Subscriptions.topicPattern(topicPattern))

    val businessLogic = Flow[CommittableMessage[String, String]]
      .mapAsync(1)(message => subscriber.ask(convertCommittableMessageToSubscriberMessage(message)))

    val snk = Flow[CommittableMessage[String, String]]
      .mapAsync(1)(message => message.committableOffset.commitScaladsl())
      .to(Sink.ignore)

    val decider: Supervision.Decider = {
      case e => {
        system.log.error("error in stream", e)
        Supervision.Stop
      }
    }

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[CommittableMessage[String, String]](2))
      val zip = builder.add(Zip[Any, CommittableMessage[String, String]])

      src ~> broadcast
      broadcast ~> businessLogic ~> zip.in0
      broadcast ~> zip.in1
      zip.out.map(_._2) ~> snk

      ClosedShape
    })
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .run(materializer)
  }