Strimzi mirrormaker2 consumer offset not getting translated to target cluster

298 Views Asked by At

We're using Strimzi MirrorMaker2 to replicate some topics from one kafka cluster to another kafka cluster (unidirectional) for DR purpose.

As a poc, I have created a sample fast producer producing to a topic and a slow consumer consuming it, to simulate lag and wanted to ensure that latest consumer offset in source cluster is getting copied to target cluster periodically. The objective here is when the primary region down, the DR region should continue near to the latest offset from source cluster.

However the consumer offset is not getting copied to target cluster and the value remains 1.

Below is the configuration applied. Kafka version 3.5.1

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: iflight-mm2-cluster
  namespace: kafka
  labels:
    app: iflight-mm2-cluster
spec:
  version: 3.5.1
  replicas: 1
  connectCluster: my-cluster-target
  clusters:
    - alias: my-cluster-source
      bootstrapServers: iflight-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
    - alias: my-cluster-target
      bootstrapServers: iflight-dr-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
      config:
        config.storage.replication.factor: 3
        offset.storage.replication.factor: 3
        status.storage.replication.factor: 3
  mirrors:
    - sourceCluster: my-cluster-source
      targetCluster: my-cluster-target
      sourceConnector:
        tasksMax: 10
        config:
          replication.factor: 3
          offset-syncs.topic.replication.factor: 3
          sync.topic.acls.enabled: "false"
          refresh.topics.enabled: "true"
          refresh.topics.interval.seconds: 5
          sync.group.offsets.enabled: "true"
          emit.checkpoints.enabled: "true"
          replication.policy.separator: ""
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
      heartbeatConnector:
        config:
          heartbeats.topic.replication.factor: 3
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
          replication.policy.separator: ""
      checkpointConnector:
        config:
          checkpoints.topic.replication.factor: 3
          refresh.groups.enabled: "true"
          refresh.groups.interval.seconds: 5
          sync.topics.configs.enabled: "true"
          sync.group.offsets.enabled: "true"
          sync.group.offsets.interval.seconds: 5
          emit.checkpoints.enabled: "true"
          emit.checkpoints.interval.seconds: 5
          replication.policy.separator: ""
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
      topicsPattern: IFLIGHT-.*
      groupsPattern: iflight_.*

Source cluster with consumer offset:2570

enter image description here

Target cluster with consumer offset:1 (The value becomes 51 when i started a consumer connecting to target cluster to check from which offset it is consumed. Since i have batchSize as 50 the value becomes 51 after the first batch consumed).

enter image description here

Is there any configuration missed for the consumer offset not getting translated?

I have posted another related issue same scenario is tried with mirrormaker2. The above strimzi configuration is the equivalent of this configuration in mirrormaker2. Please suggest anything missed.

2

There are 2 best solutions below

0
ssahlender On

We had kind of same issue although the lag was not that high. For us it was the setting, offset lag max is per default set to 100. We changed it to 0 so that it gets synced whenever the offset changes.

sourceConnector:
  config:
    offset.lag.max: 0

0
shiv On

We had similar problem and changing replication.policy.separator from "" to "." helped