Kafka Streams: Failed to rebalance due to Change log state changing during restoring

2.8k Views Asked by At

Need some help figuring out the exception that i receive in one of the Kafka stream consumer.

I have implemented a Kafka streams with low level processor API. For each updates we receive from Kafka, it is merged and updated to keystore, so that the state is maintained. Initially we ran only one consumer and after some time we tried bringing up the 2nd consumer. But the 2nd consumer, during re-balancing threw a exception stating that it had failed to re-balance. This occurred because the state of a change log had changed (Exception share below). I assume, when the re-balance was occurring, some updates was received by the 1st consumer and hence the updates were pushed to the respective change log. Please help. Also sharing the sample code for the same. I am using Kafka 2_11 0.10.2.1 and the topic has 72 partitions

Exception

    Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
    Caused by: java.lang.IllegalStateException: task [0_60] Log end offset of Kafka-xxxxxxxxxxxxxxxx-InfoStore-changelog-60 should not change while restoring: old end offset 80638, current offset 80640
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:252)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
    at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:56)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
    at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)

Code Snippet

    public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(InfoProcessor.class);
private ProcessorContext context;
private KeyValueStore<Key, Info> infoStore;
private int visitProcessorInstanceId;

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
    infoStore = (KeyValueStore<Key, Info>) context.getStateStore("InfoStore");
}

@Override
public void process(Key key, Update update) {
    try {
        if (key != null && update != null) {
            Info info = infoStore.get(key);
            // merge logic
            infoStore.put(key, info);
        }

    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    } finally {
    }
    context.commit();
}

@Override
public void punctuate(long timestamp) {
    try {
        KeyValueIterator<Key, Info> iter = this.infoStore.all();
        while (iter.hasNext()) {
            // processing logic

        }
        iter.close();
        context.commit();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}

}

Thank you.

1

There are 1 best solutions below

2
On BEST ANSWER

Your observation an reasoning is correct. This can happen, if rebalance due to state migration takes long and another rebalance happens:

  1. First instance is running
  2. Second instance starts, triggering a rebalance
    • Second instance recreates state
  3. Another rebalance happens (not sure how this could be triggered in your case)
    • Second instance is still recreating state and does not rejoin the group (thus it drops out of the group)
  4. First instance gets state migrated back (it still has a full copy of the state so there is nothing to recreate -- second instance did not start to process anything yet) and resumes to write to changelog topic
  5. Second instance dies with exception as describe

Can you verify this? If yes, you need to avoid a second rebalance as long as state recreation is running.

Btw: this behavior is improved in trunk already and will be fixed in upcoming 0.11.0.1 release. You can update your Kafka Streams application to 0.11.0.1 without the need to upgrade the brokers. 0.11.0.1 should be release the next couple of weeks.