I am using Confluent Platform v5.4, Mongo DB version 3.6 and Kafka Mongo DB connector. I have configured the Kafka Mongo DB source connector to push in data from Mongo DB to a Kafka topic as and when a new record gets created in Mongo DB Collection. I have configured my Kafka broker to keep the data log for 1 day which was earlier set to the default configuration of 7 days. Now my problem is, I repeatedly get errors like this in the Mongo DB logs -
2020-09-08T14:58:34.500+0000 I COMMAND [conn66] command local.oplog.rs command: aggregate { aggregate: "topic_1", pipeline: [ { $changeStream: { fullDocument: "default", resumeAfter: { _data: BinData(0, 825F50DCCE00000008463C5F6964003C35363962633732642D386430352D343034622D616262362D64363632656136663464303000005A1004B31FA99AA5C141CB9EE9AA845877B80D04) } } } ], cursor: {}, $db: "ctc", $clusterTime: { clusterTime: Timestamp(1599577091, 6), signature: { hash: BinData(0, 6550821FD90928ABC3A2DFE066FC34E824A83762), keyId: 6847473605622628353 } }, lsid: { id: UUID("2a2e5878-4743-4e83-a4fd-eed68b5efe02") }, $readPreference: { mode: "primaryPreferred" } } planSummary: COLLSCAN exception: resume of change stream was not possible, as the resume token was not found. {_data: BinData(0, "825F579ACE00000008463C5F6964003C31313632333863352D316561612D343961652D613437632D30366263336131333436313900005A1004B31FA99AA5C141CB9EE9AA845877B80D04")} code:ChangeStreamFatalError numYields:8932 reslen:455 locks:{ Global: { acquireCount: { r: 17872 }, acquireWaitCount: { r: 97 }, timeAcquiringMicros: { r: 350686 } }, Database: { acquireCount: { r: 8936 } }, Collection: { acquireCount: { r: 1 } }, oplog: { acquireCount: { r: 8935 } } } protocol:op_msg 22198ms
I understand that between the Kafka topic and Mongo DB's Oplog, there is a resume token which is shared to keep the data stream up and running. But sometimes the Oplog is flushed out (as Oplog size can be only some percentage of the memory allotted to the Mongo DB) or the data corresponding to Kafka topic is deleted which leads to the rupture of the stream.
My question is- How can I avoid the stream breakage? how can I make sure that the resume token is always present in the Oplog and the Kafka topic? Is there any way in which I can manually fetch the resume token from where it is available and update the resume token at the place where it is missing?
There has been a change made in 4.4-compatible drivers to deal with change stream resume token falling off the end of the oplog when there aren't any changes for a long time. The user side of this is described here for the Ruby driver; adapt to Java as necessary. The key is you need to use
tryNext
and read the resume token off the change stream rather than out of the returned change documents.If you are doing all of the above you need to either: