Using MongoDB Spark Connector to filter based on timestamp

3k Views Asked by At

I am using Spark MongoDB connector to fetch data from mongodb..However I am not able to get how I can query on Mongo using Spark using aggregation pipeline(rdd.withPipeline).Following is my code where I want to fetch records based on timestamp & store in dataframe :

 val appData=MongoSpark.load(spark.sparkContext,readConfig)
val df=appData.withPipeline(Seq(Document.parse("{ $match: { createdAt : { $gt : 2017-01-01 00:00:00 } } }"))).toDF()

Is this a correct way to query on mongodb using spark for timestamp value?

3

There are 3 best solutions below

0
On

try this(but it has limitation like mongo date and ISODate can only take TZ format timestamp only.

option("pipeline", s"""[{ $$match: { "updatedAt" : { $$gte : new ISODate("2022-11-29T15:26:21.556Z") } } }]""").mongo[DeltaComments]
0
On

As the comment mentioned, you could utilise the Extended JSON format for the date filter.

val appDataRDD  = MongoSpark.load(sc)
val filteredRDD = appDataRDD.withPipeline(Seq(Document.parse("{$match:{timestamp:{$gt:{$date:'2017-01-01T00:00:00.000'}}}}")))
filteredRDD.foreach(println)

See also MongoDB Spark Connector: Filters and Aggregation to see an alternative filter.

1
On

Try this:

val pipeline = "{'$match': {'CreationDate':{$gt: {$date:'2020-08-26T00:00:00.000Z'}}}}"

val sourceDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://administrator:[email protected]:27017/?authSource=admin").option("database","_poc").option("collection", "activity").option("pipeline", pipeline).load()