I use the Alpakka Kafka connector for running a Consumer stream inside akka streams. The stream starts streams inside for each partition that is assigned to this consumer instance. That means if there is any error inside a partition stream, then I need to restart either the partition or everything.
The code is following:
Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
.mapAsyncUnordered(
maxPartitions,
pair -> {
Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
pair.second();
return source
.via(businessThatMayCrash())
.map(message -> message.committableOffset())
.runWith(Committer.sink(committerSettings), system);
})
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(system);
See Separate streams per partition https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition
But how I can restart the streams? Please note: If there is any error on Kafka side, then it resumes automatically.
I tried to use the RestartSource for each partition but if the stream is failed then the Kafka consume does not work anymore.
https://doc.akka.io/docs/akka/current/stream/operators/RestartSource/onFailuresWithBackoff.html
Also control.streamCompletion() does not get completed.
There is also the option to add a watchTermination() for each partition that calls control.drainAndShutdown(..) but that seems quite complicated.
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/watchTermination.html
What is the best solution for that?
The trick will be to keep the failure of the stream you're running inside
mapAsyncUnorderedfrom failing themapAsyncUnordered.The easiest way to do this is to construct a
CompletableFuturewhich will succeed when the stream succeeds or fails.When the stream being run fails, a new source should be picked up by the
committablePartitionedSource.