KSQL | Consumer lag | confluent cloud |

558 Views Asked by At

I am using kafka confluent cloud as a message queue in the eco-system. There are 2 topics, A and B.
Messages in B arrives a little later after messages of A is being published. ( in a delay of 30 secs )

I am joining these 2 topics using ksql, ksql server is deployed in in-premises and is connected to confluent cloud. In the KSQL i am joining these 2 topics as streams based on the common identifier, say requestId and create a new stream C. C is the joined stream.

At a times, C steam shows it has generated a lag it has not processed messages of A & B. This lag is visible in the confluent cloud UI. When i login to ksql server i could see following error and after restart of ksql server everything works fine. This happens intermittently in 2 - 3 days.

Here is my configuration in the ksql server which is deployed in in-premises.

# A comma separated list of the Confluent Cloud broker endpoints
bootstrap.servers=${bootstrap_servers}
ksql.internal.topic.replicas=3
ksql.streams.replication.factor=3
ksql.logging.processing.topic.replication.factor=3
listeners=http://0.0.0.0:8088
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${bootstrap_auth_key}" password="${bootstrap_secret_key}";

# Schema Registry specific settings
ksql.schema.registry.basic.auth.credentials.source=USER_INFO
ksql.schema.registry.basic.auth.user.info=${schema_registry_auth_key}:${schema_registry_secret_key}
ksql.schema.registry.url=${schema_registry_url}

#Additinoal settings
ksql.streams.producer.delivery.timeout.ms=2147483647
ksql.streams.producer.max.block.ms=9223372036854775807
ksql.query.pull.enable.standby.reads=false
#ksql.streams.num.standby.replicas=3 // TODO if we need HA 1+1
#num.standby.replicas=3
# Automatically create the processing log topic if it does not already exist:
ksql.logging.processing.topic.auto.create=true

# Automatically create a stream within KSQL for the processing log:
ksql.logging.processing.stream.auto.create=true
compression.type=snappy
ksql.streams.state.dir=${base_storage_directory}/kafka-streams

Error message in the ksql server logs.

[2020-11-25 14:08:49,785] INFO stream-thread [_confluent-ksql-default_query_CSAS_WINYES01QUERY_0-04b1e77c-e2ba-4511-b7fd-1882f63796e5-StreamThread-2] State transition from RUNNING to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread:220)
[2020-11-25 14:08:49,790] ERROR [Consumer clientId=_confluent-ksql-default_query_CSAS_WINYES01QUERY_0-04b1e77c-e2ba-4511-b7fd-1882f63796e5-StreamThread-3-consumer, groupId=_confluent-ksql-default_query_CSAS_WINYES01QUERY_0] Offset commit failed on partition yes01-0 at offset 32606388: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1185)
[2020-11-25 14:08:49,790] ERROR [Consumer clientId=_confluent-ksql-default_query_CSAS_WINYES01QUERY_0-04b1e77c-e2ba-4511-b7fd-1882f63796e5-StreamThread-3-consumer, groupId=_confluent-ksql-default_query_CSAS_WINYES01QUERY_0] Offset commit failed on partition yes01-0 at offset 32606388: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1185)
[2020-11-25 14:08:49,790] WARN stream-thread [_confluent-ksql-default_query_CSAS_WINYES01QUERY_0-04b1e77c-e2ba-4511-b7fd-1882f63796e5-StreamThread-3] Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group. (org.apache.kafka.streams.processor.internals.StreamThread:572)
org.apache.kafka.streams.errors.TaskMigratedException: Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group; it means all tasks belonging to this thread should be migrated.
        at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1009)
        at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:962)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:851)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:714)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1251)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1158)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)

Edit :

  • During this exception. i have verified the ksql server has enough RAM and CPU
0

There are 0 best solutions below