Cannot use timestamp from oplog for filtering

343 Views Asked by At

I have a stream of oplog messages produced by MongoDB Kafka connector. I want to tell the connector to start producing messages from a given point in time. I can do it using pipelines [documented here|https://docs.mongodb.com/kafka-connector/current/kafka-source/.]

I'm trying to prepare a query that will use the clusterTime for this operation but with no luck. The clusterTime is a BSON Timestamp. I'm using MongoDB v4.0.9.

Here's how I test it

db.gustotest.insertMany([{ 

    "operationType" : "insert", 

    "clusterTime" : {

        "$timestamp" : {

            "t" : 1634824102.0, 

            "i" : 1.0

        }

    }, 

    "fullDocument" : {

        "_id" : {

            "$oid" : "61716fa62b7ffb4a2e01a235"

        }, 

        "location" : "Location01", 

        "organizationID" : "123", 

        "created" : {

            "$date" : 1634824102357.0

        }

    }, 

    "ns" : {

        "db" : "warehouse", 

        "coll" : "gustotest"

    }, 

    "documentKey" : {

        "_id" : {

            "$oid" : "61716fa62b7ffb4a2e01a235"

        }

    }

}

])

when we have at least one element, we can give it a try and filter by the clusterTime. I've tried many approaches like

 db.gustotest.aggregate( [

   { $addFields:

      {

        convertedDate: { $toDate: {$dateToString:{date:"$clusterTime"}} },

       }

    }

] )

or

db.gustotest.aggregate( [

   { $match:

      {

        clusterTime: { $gt: Timestamp(0, 0) },

       }

    }

] ) 

Is there any other way I can achieve this without using $getField, $function that are available in the newest versions?

I managed to extract the pure timestamp from this field but used the $function feature. Unfortunately, it's supported by MongoDB v5.0 so I cannot use it.

db.gustotest.aggregate( [

   { $addFields:

      {

        cTime:

            { $function:

               {

                  body: function(clusterTime) {

                     return clusterTime["$timestamp"].t

                  },

                  args: [ "$clusterTime" ],

                  lang: "js"

               }

            },

       }

    }

] ) 

This is the minimal code that can be used to reproduce the problem

db.mycollection.insertOne(
  {
    "clusterTime": {
      "$timestamp": {
        "t": 120000001,
        "i": 0
      }
    }
  }
)

db.mycollection.aggregate([
  {
    $match: {
      clusterTime: {
        $gt: Timestamp(0, 0)
      }
    }
  }
])
1

There are 1 best solutions below

2
Joe On

MongoDB is finicky about field names that begin with dollar sign.

You an use an extra stage to convert that object to an array so the field name becomes a value:

db.gustotest.aggregate([
    {$addFields:{
       date:{$arrayElemAt:[{$objectToArray:"$clusterTime"},0]}
    }},
    {$addFields:{
       date:{$toDate:{$multiply:[1000,"$date.v.t"]}}
    }}
])

EDIT I had forgotten that $timestamp was a special mongodb type indicator. Here is how to convert a Timestamp type to a date:

{$addFields: {
    date: {
        $dateFromString: {
            dateString: {
                $dateToString: {date: "$clusterTime"}
            }
        }
    }
}}

Playground