Resume token missing in Kakfa-MongoDB source connector stream

1.5k Views Asked by At

I am using Confluent Platform v5.4, Mongo DB version 3.6 and Kafka Mongo DB connector. I have configured the Kafka Mongo DB source connector to push in data from Mongo DB to a Kafka topic as and when a new record gets created in Mongo DB Collection. I have configured my Kafka broker to keep the data log for 1 day which was earlier set to the default configuration of 7 days. Now my problem is, I repeatedly get errors like this in the Mongo DB logs -

2020-09-08T14:58:34.500+0000 I COMMAND  [conn66] command local.oplog.rs command: aggregate { aggregate: "topic_1", pipeline: [ { $changeStream: { fullDocument: "default", resumeAfter: { _data: BinData(0, 825F50DCCE00000008463C5F6964003C35363962633732642D386430352D343034622D616262362D64363632656136663464303000005A1004B31FA99AA5C141CB9EE9AA845877B80D04) } } } ], cursor: {}, $db: "ctc", $clusterTime: { clusterTime: Timestamp(1599577091, 6), signature: { hash: BinData(0, 6550821FD90928ABC3A2DFE066FC34E824A83762), keyId: 6847473605622628353 } }, lsid: { id: UUID("2a2e5878-4743-4e83-a4fd-eed68b5efe02") }, $readPreference: { mode: "primaryPreferred" } } planSummary: COLLSCAN exception: resume of change stream was not possible, as the resume token was not found. {_data: BinData(0, "825F579ACE00000008463C5F6964003C31313632333863352D316561612D343961652D613437632D30366263336131333436313900005A1004B31FA99AA5C141CB9EE9AA845877B80D04")} code:ChangeStreamFatalError numYields:8932 reslen:455 locks:{ Global: { acquireCount: { r: 17872 }, acquireWaitCount: { r: 97 }, timeAcquiringMicros: { r: 350686 } }, Database: { acquireCount: { r: 8936 } }, Collection: { acquireCount: { r: 1 } }, oplog: { acquireCount: { r: 8935 } } } protocol:op_msg 22198ms

I understand that between the Kafka topic and Mongo DB's Oplog, there is a resume token which is shared to keep the data stream up and running. But sometimes the Oplog is flushed out (as Oplog size can be only some percentage of the memory allotted to the Mongo DB) or the data corresponding to Kafka topic is deleted which leads to the rupture of the stream.

My question is- How can I avoid the stream breakage? how can I make sure that the resume token is always present in the Oplog and the Kafka topic? Is there any way in which I can manually fetch the resume token from where it is available and update the resume token at the place where it is missing?

2

There are 2 best solutions below

0
On

There has been a change made in 4.4-compatible drivers to deal with change stream resume token falling off the end of the oplog when there aren't any changes for a long time. The user side of this is described here for the Ruby driver; adapt to Java as necessary. The key is you need to use tryNext and read the resume token off the change stream rather than out of the returned change documents.

If you are doing all of the above you need to either:

  • Increase the oplog window and retain more of the oplog, or
  • Process the changes faster to stay within the oplog window.
0
On

A Long term solution would be to set heartbeats.
This would make your source connector update its resume token at regular intervals as well as when the contents of your source MongoDB namespace changes. See This Official MongoDB Page for more details. https://www.mongodb.com/docs/kafka-connector/current/troubleshooting/recover-from-invalid-resume-token/#prevention

For a quick connector fix (one time fix), setting the "offset.partition.name" attribute worked for me. Described in the above link too.