Avien OpenSearch Sink Connector: throwing version conflict

150 Views Asked by At

We are using Avien OpenSearch Kafka Sink connector to move documents from Kafka Topic to OpenSearch. But have been seeing below version conflict message way too many times (20-30/10 mins).

[2023-12-07 04:29:27,381] WARN Encountered a version conflict when executing batch 1426884 of 25 records. Ignoring and will keep an existing record. Error was [index_name/AcAS_NB7SsuIL9t59d8zhg][[topic_name][1]] OpenSearchException[OpenSearch exception [type=version_conflict_engine_exception, reason=[64ae7de6aceb2382704dd17f]: version conflict, current version [5984159] is higher or equal to the one provided [5984159]]] (io.aiven.kafka.connect.opensearch.BulkProcessor)

Note: the both the versions (old and new) are the same.

Here is the Sink Connector config

{
    "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
    "producer.override.buffer.memory": "3145728",
    "behavior.on.null.values": "delete",
    "connection.password": "pass",
    "index.write.method": "upsert",
    "transforms.v2_rf.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.v2_rf.renames": "_id:oid",
    "tasks.max": "3",
    "transforms": "v1_ex,v2_rf",
    "key.ignore": "false",
    "connection.compression": "true",
    "retry.backoff.ms": "400",
    "transforms.v1_ex.field": "fullDocument",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "read.timeout.ms": "15000",
    "behavior.on.version.conflict": "warn",
    "topics": "topic_name",
    "transforms.v1_ex.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "batch.size": "100",
    "connection.username": "user",
    "max.in.flight.requests": "1",
    "key.ignore.id.strategy": "record.key",
    "schema.ignore": "true",
    "key.converter.schemas.enable": "false",
    "flush.timeout.ms": "180000",
    "name": "connector_name",
    "value.converter.schemas.enable": "false",
    "connection.url": "<connection url>"
}

And yes, we need the index write method to be upsert.

Initially we thought this was a throughput issue, where OpenSearch wasn't able to match the speed at which the Sink connector is pushing the data. And thus not able to respond back within specific time (within retry.backoff.ms time). So,

  • We limited number of messages inflight from 5 to 1, didn't help.
  • Tried to increase the retry.backoff.ms from 100ms to 400ms, doesn't do anything.

Then we enabled DEBUG logs on the Sink Connector and saw that responses from OpenSearch were within few milliseconds. Below shows one such request took 3 ms.

[2023-12-07 04:52:50,402] DEBUG http-outgoing-175 << "{"took":3,"errors":true,"items":[{"index":{"_index":"index_name","_id":"64d092db50dc39f7f0920c13","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[64d092db50dc39f7f0920c13]: version conflict, current version [6064623] is higher or equal to the one provided [6064623]","index":"index_name","shard":"0","index_uuid":"AcAS_NB7SsuIL9t59d8zhg"}}},...

Then we thought may be the tasks are reading same offset multiple times, so we should be able to see the offset or the conflicting document id or the version in the logs before the error. But no, we cannot find any instance of previous submission. May be the version logic is messed up, but that seems to be the offset. Which makes it less likely that the same document id is present in different partition but on the same offset, since the partition key is the document id itself.

Now, I am running out of ideas and there isn't a dedicated forum to discuss this. Any pointers on what could be the issue here?

0

There are 0 best solutions below