Using MongoTemplate (also MongoRepository) with ExecutorService to update all documents in a Mongo collection that meet a certain criteria does not work as expected, i.e. some documents are not found and updated. A single threaded approach works fine, but takes a few hours to complete.
I am running this in a spring boot application using an implementation of CommandLineRunner (this is meant to be ran as an update script on multiple environments; output to log file for future reference).
I tried both MongoTemplate, using save() on each document and also MongoRepository, using saveAll() on a list of modified documents. For some reason, some documents are not modified afterwards (I can see after running the script, comparing mongodb data and script output). The numberOfDocuments and numberOfPages values are correct and the output lists all pages started and finished, but still some documents are not updated.
Again, the same code but without an executorService works exactly as expected, however it takes too long. My thinking was that each page and each document is independent from the others, and so a multithreaded approach is possible.
@Component
public class SomeCommand implements CommandLineRunner {
private static final Logger LOG = LoggerFactory.getLogger(SomeCommand.class);
private MongoTemplate mongoTemplate;
// other dependencies...
private static final int PAGE_SIZE = 1000;
public SomeCommand(MongoTemplate mongoTemplate /*other dependencies...*/) {
this.mongoTemplate = mongoTemplate;
// other dependencies...
}
@Override
public void run(String... args) {
Query query = Query.query(Criteria.where("someField").is("someValue")
.andOperator(Criteria.where("anotherField").exists(false)));
long numberOfDocuments = mongoTemplate.count(query, SomeClass.class);
long numberOfPages = (long) Math.ceil((double) numberOfDocuments / PAGE_SIZE);
List<CompletableFuture> tasks = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < numberOfPages; i++) {
int pageNumber = i;
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
LOG.info(">>>>>>>>>> Started processing for PAGE " + (pageNumber + 1) + "/" + numberOfPages + " <<<<<<<<<<");
updatePage(query, pageNumber);
LOG.info(">>>>>>>>>> Finished processing for PAGE " + (pageNumber + 1) + "/" + numberOfPages + " <<<<<<<<<<");
}, executorService);
tasks.add(task);
}
tasks.forEach(CompletableFuture::join);
executorService.shutdown();
}
private void updatePage(Query query, int pageNumber) {
Pageable pageable = PageRequest.of(pageNumber, PAGE_SIZE);
List<SomeClass> list = mongoTemplate.find(query.with(pageable), SomeClass.class);
for (SomeClass doc : list) {
// additional checks, if conditions are met, then instance is updated
// with a new property read from a different data source
mongoTemplate.save(doc);
}
}
}