Resetting offset for partition after (Re-)joining group

111 Views Asked by At

here is the problem: due to some issues (probably network) our application losing connection with 3 of 5 our Kafka brokers:

Error sending fetch request (sessionId=2108009367, epoch=INITIAL) to node 4: 
org.apache.kafka.common.errors.DisconnectException.
Error sending fetch request (sessionId=1169924518, epoch=INITIAL) to node 1: 
org.apache.kafka.common.errors.DisconnectException.
Error sending fetch request (sessionId=1156529071, epoch=INITIAL) to node 5: 
org.apache.kafka.common.errors.DisconnectException.

After this there is a big amount of similar logs about unavailable group coordinator:

Group coordinator <host>:9093 (id: 2147483642 rack: null) is unavailable or invalid, will attempt rediscovery
Discovered group coordinator <host>:9093 (id: 2147483642 rack: null)

And so on and so on hundreds of times, but then consumer group rejoins successfully, but loses previously commited offset:

Revoking previously assigned partitions [<topic>-0, <topic>-1, <topic>-2]
(Re-)joining group
Successfully joined group with generation 1
Setting newly assigned partitions: <topic>-0, <topic>-1, <topic>-2
Found no committed offset for partition <topic>-0
Found no committed offset for partition <topic>-1
Found no committed offset for partition <topic>-2
Resetting offset for partition <topic>-0 to offset 3812363 (first within retention period).

But problem is that this offset 3812363 is not correct offset, this is the earliest, so after rebalancing and rejoining consumer group we get incorrect offset. Because of that or application reprocesses big amount of data within retention period which is really bad :(

Maybe somebody faced similar issues? I will be happy to share any information like logs, configs that you will find useful, just let me know!

UPD Consumer code:

  def receive: Receive = {
case job: Job =>
  Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
    .collect { case msg@JsonTx(tx) => (msg, tx) }
    .mapAsync(streamParallelism) { case (msg, tx) =>
      currencyNormalizer.normalize(tx).map { msg -> _ }
    }
    .mapAsync(streamParallelism) { case (kafkaMsg, message) =>
      //build tx and persist to DB
    }
    .map { case (kafkaMsg, message) =>
      //send to other topic
      ProducerMessage.Message(new ProducerRecord[String, String](configService.topic, message.toString()), kafkaMsg.committableOffset)
    }
    .via(Producer.flow(producerSettings))
    .map(_.message.passThrough)
    .batch(max = 100, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(streamParallelism) { msg =>
      commitOffsets(msg)
    }
    .withAttributes(ActorAttributes.supervisionStrategy(decider))
    .runWith(Sink.ignore)
  ()

}

private def commitOffsets(x: ConsumerMessage.CommittableOffsetBatch): Future[akka.Done] = {
    retry
      .Backoff(max = maxRetries, delay = offsetRetryDelay, base = offsetBackoffBase)
      .apply {
        Try(x.commitScaladsl()) match {
          case Success(value) => value
          case Failure(e) =>
            logger.warn("Failed to commit offsets. Retrying...", e)
            throw e
        }
      }(retry.Success[akka.Done] {_ => true}, ec)
  }

UPD 2 Dependency we use, I don't see and Alpakka dependencies in the code

"com.typesafe.akka" %% "akka-stream-kafka" % Versions.akkaKafka
0

There are 0 best solutions below