Spring WebFlux + MongoDB: Tailable Cursor and Aggregation

458 Views Asked by At

I´m new with WebFlux and MongoDB. I´m trying to use aggregation in a capped collection with tailable cursor, but I´m nothing getting sucessful. I´d like to execute this mongoDB query:

db.structures.aggregate(
   [
     {
        $match: {
            id: { $in: [8244, 8052]}
        }    
     },  
     { $sort: { id: 1, lastUpdate: 1} },
     {
       $group:
         {
           _id: {id: "$id"},
           lastUpdate: { $last: "$lastUpdate" }
         }
     }
   ]
)

ReactiveMongoOperations gives me option to "tail" or "aggregation".

I´m able to execute aggregation:

    MatchOperation match = new MatchOperation(Criteria.where("id").in(8244, 8052));
    GroupOperation group = Aggregation.group("id", "$id").last("$lastUpdate").as("lastUpdate");
    Aggregation aggregate = Aggregation.newAggregation(match, group);

    Flux<Structure> result = mongoOperation.aggregate(aggregate,
            "structures", Structure.class);

Or tail cursor

    Query query = new Query();
    query.addCriteria(Criteria.where("id").in(8244, 8052));
    Flux<Structure> result = mongoOperation.tail(query, Structure.class);

Is it possible? Tail and Aggregation together?

Using aggregation was the way that I found to get only the last inserted document for each id.

Without aggregation I get:

query without aggregation

With aggregation:

query with aggregation

Tks in advance

1

There are 1 best solutions below

0
NikolaB On

The tailable cursor query creates a Flux that never completes (never emits onComplete event) and that Flux emits records as they are inserted in the database. Because of that fact I would think aggregations are not allowed by the database engine with the tailable cursor.

So the aggregation doesn't make sense in a way because on every newly inserted record the aggregation would need to be recomputed. Technically you can do a running aggregation where for every returned record you compute the wanted aggregate record and send it downstream.

One possible solution would be to do the aggregations programmatically on the returned "infinite" Flux:

mongoOperation.tail(query, Structure.class)
  .groupBy(Structure::id) // create independent Fluxes based on id
  .flatMap(groupedFlux -> 
    groupedFlux.scan((result, nextStructure) -> { // scan is like reduce but emits intermediate results
    log.info("intermediate result is: {}", result);
    if (result.getLastUpdate() > nextStructure.getLastUpdate()) {
        return result;
    } else {
        result.setLastUpdate(nextStructure.getLastUpdate());
        return result;
    }
}));

On the other hand you should probably revisit your use case and what you need to accomplish here and see if something other than capped collection should be used or maybe the aggregation part is redundant (i.e. if newly inserted records always have the lastUpdate property larger then the previous record).