How to set consumer config values for Kafka Mirrormaker-2 2.6.1?

1.1k Views Asked by At

I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA.

I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file (included)

This works fine for topics with small messages on them, but one of my topic has binary messages up to 20MB in size. When I try to replicate that topic I get an error every 30 seconds

    [2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. (org.apache.kafka.clients.FetchSessionHandler:481)
    org.apache.kafka.common.errors.DisconnectException

When logging in DEBUG to get more information we get

    [2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] Disconnecting from node 2 due to request timeout. (org.apache.kafka.clients.NetworkClient:784)
    [2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-29, correlationId=35) due to node 2 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)

It gets stuck in a loop constantly disconnecting with request timeout every 30s and then trying again.

Looking at this, I suspect that the problem is the request.timeout.ms is on the default (30s) and it times out trying to read the topic with many large messages.

I followed the guide at https://github.com/apache/kafka/tree/trunk/connect/mirror to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between the disconnect messages. e.g. I set:

    CLOUD_EU.consumer.request.timeout.ms=120000

In the properties that I start MM-2 with.

based on various guides I have found while looking at this, I have also tried

   CLOUD_EU.request.timeout.ms=120000
   CLOUD_EU.cluster.consumer.request.timeout.ms=120000
   CLOUD_EU.consumer.override.request.timeout.ms=120000
   CLOUD_EU.cluster.consumer.override.request.timeout.ms=120000

None of which have worked.

How can I change the consumer request.timeout setting? The log is approx 10,000 lines long, but everywhere where the ConsumerConfig is logged out it logs request.timeout.ms = 30000

Properties file I am using:

# specify any number of cluster aliases
clusters = CLOUD_EU, CLOUD_NA

# connection information for each cluster
CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092
CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092

# enable and configure individual replication flows
CLOUD_EU->CLOUD_NA.enabled = true
CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU
CLOUD_NA->CLOUD_EU.enabled = false

replication.factor=3
tasks.max = 1

############################# Internal Topic Settings  #############################
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

############################    Kafka Settings    ###################################

# CLOUD_EU cluster over writes
CLOUD_EU.consumer.request.timeout.ms=120000
CLOUD_EU.consumer.session.timeout.ms=150000
0

There are 0 best solutions below