Spring data mongo resume token for MessageListenerContainer

545 Views Asked by At

Hi I am trying to implement MessageListener from mongo oplog and it should retrieve the document from last stopped streaming document. Currently I have the code set up as below. Not sure how to retrieve resumeToken from last document and set it so that if listener application goes down and comes back online, it should read after the last read.

    @Bean
    MessageListenerContainer candidateMessageListenerContainer(MongoTemplate mongoTemplate, @Qualifier("candidateMessageListener") MessageListener documentMessageListener)
    {
    Executor executor = Executors.newSingleThreadExecutor();
    MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, executor)
    {
        @Override
        public boolean isAutoStartup()
        {
        return true;
        }
    };
    ChangeStreamRequest<Candidate> request = ChangeStreamRequest.builder(documentMessageListener)
                    .collection("candidate") // The name of the collection to listen to , Do not specify the default listening database
                    .filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete")))) // Filter the types of operations that need to be monitored , You can specify filter conditions according to your needs
                    .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                    // When not set , When the document is updated , Only information about the changed fields will be sent , Set up UPDATE_LOOKUP All information of the document will be returned
                    .build();
    messageListenerContainer.register(request, Candidate.class);
    return messageListenerContainer;
    }

1

There are 1 best solutions below

2
On

You can use the getResumeToken() or getTimestamp() methods on ChangeStreamEvent to obtain this information. See the Reference Documentation - Resuming Change Streams for details.