Akka streams kafka commit offset after filter

1k Views Asked by At

I am trying to have an At-least-once commit strategy for offset in akka streams, and I am not able to understand what is the expected pattern for the cases I use filter on my stream.

my expectations is that none of the filtered messages will get their offset comited, so they will end up in an infinite loop of processing.

An abasurd example ilustrating this is filtering all messages like this:

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl()) 
.runWith(Sink.ignore)

I can only see a solution of wrapping my filters in flows that check if the logic will filter out and commit in that case, but this seems not elegant, and deminish the value of having filter shapes.

Filtering is not a rare thing to do, but i cannot see any elegant way of committing the offset? for me It seems strange there is no way to do this by the framework, so what am I missing?

1

There are 1 best solutions below

3
On BEST ANSWER

I was not able to find a solution with the current akka implementation to have a more intelligent commiting of the index, So I have delegated the responsibility to kafka setting auto-commit at kafka level and also combined this with a graceful shutdown strategy for the app so when the blue/green deployment happens all the messages are process before the application closes.

  • Autocommit to true:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
  • Graceful shutdown:
val actorMaterializer = ActorMaterializer(
  ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
        actorMaterializer.system.terminate()
        Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}