I previously created an integration flow that uses a MongoDb inboundChannelAdapter with updateExpression and set it to be transactional:
IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
.collectionName("work").entityClass(Document.class)
.update(Update.update("status", "P")),
p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
This poller would get a list of documents, get a write lock on them, do a split() on them. And then start the processing. If all went well the transaction would commit at the end of the flow where an outboundGateway sets the records to DONE.
The purpose of the transaction and updateExpression() is to prevent other polling threads from grabbing the same records and processing them if they were already being processed. This works, but if any single document in that list has a write lock already on it, the entire polling fails, since it's one transaction on the entire list.
My next iteration of the code is to instead try to get a transaction after the split() for each individual document. That way, if a write lock attempt fails for just one document, it's able to move on to the next one on the list and attempt a write lock on that one. Taking it one step further, each individual document could be processed in parallel on separate threads each having its own transaction. My attempt looked like this:
@Bean
public IntegrationFlow mongoFlowTxnPerDoc(MongoTemplate mongoTemplate, TransactionManager tm) {
return IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'TREADY'}")
.collectionName("work").entityClass(Document.class),
p -> p.poller(pm -> pm.fixedRate(1000L)))
.split()
.channel(c -> c.executor("txProcess.input", Executors.newFixedThreadPool(3)))
.get();
}
This should get a list of records without trying to update anything yet. It instead it is sent to an executor channel.
Here, I'm using e -> e.transactional(true) on the outboundGateway which starts the findOneAndUpdate on the single document.
@Bean
public IntegrationFlow txProcess(MongoTemplate mongoTemplate) {
return f -> f
.handle(MongoDb.outboundGateway(mongoTemplate).collectionName("work").entityClass(Document.class)
.collectionCallback((c, m) ->
c.findOneAndUpdate(Filters.eq("uuid", ((Document) m.getPayload()).get("uuid")),
Updates.set("status", "DONE"))
), e -> e.transactional(true)
)
.<Document>handle((p, h) -> {
System.out.println("-----PROCESSING-----");
System.out.println(p);
// simulate something that takes time
try {Thread.sleep(10000);} catch (InterruptedException e) {}
return null;
});
}
However I'm noticing strange behavior and I'm not quite sure what's happening or what I'm doing wrong.
If I insert one or two messages, it seems to behave as expected. The output shows the message that it's working on, and it eventually commits the transaction where the status is now DONE. I also see WriteConflict errors coming from the poller every second which is to be expected since there are only those two records in the collection and it has nothing else to move on to. The output looks something like:
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=TREADY, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=TREADY, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}
If I insert three or more records, the polling WriteConflict errors stop. It's almost as if the poller is no longer running on blocked on something. The initial output of the records being processed looks okay:
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=TREADY, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=TREADY, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}
-----PROCESSING-----
Document{{_id=65bc00ff5a057207a14e2ba4, status=TREADY, message=hello, uuid=31988399-7b1e-4f5e-832f-53a6baa7883d}}
However after those finish, something strange happens. The flow gets stuck in a loop where it keeps processing the same messages it already completed. This is indicated by the fact that the status is now DONE instead of TREADY:
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=DONE, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=DONE, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}
-----PROCESSING-----
Document{{_id=65bc00ff5a057207a14e2ba4, status=DONE, message=hello, uuid=31988399-7b1e-4f5e-832f-53a6baa7883d}}
This carries on forever until I kill the application. I'm thinking that maybe the transactions are staying open and not committing at the end of the flow. Or maybe something else.
EDIT: The transactional part of it may be a red herring. Even if I remove the transactional(true) it still begins to loop on DONE messages. I must be misusing the executor channel.
EDIT2: I think what's happening is that when records are being re-polled and submitted to the fixed thread pool executor, they are being inserted into a blocking queue. If the re-pooled document remains in the queue long enough for the original thread to finish and commit it as DONE, it won't get a WriteLock error by the time it is passed into findOneAndUpdate. Also, as the queue reaches capacity eventually and starts blocking the polling thread from adding anymore- which is why I think the polling seems to pause for a while. Leaving it unanswered just to get others' perspectives.
I think the solution for you must like this:
Do not use
fixedRate()to not poll for new records until you done with the current set in this channel adapter. ThefixedRatestarts a new polling task in parallel for already ran.When you put the result of MongoDB channel adapter into that new executor channel before splitter will enforce the original transaction to be committed. Just because we leave the current thread.
This way just pulled documents will be updated to avoid them for the next polling cycle.
The rest of the logic to deal per message is OK. You might still have that
transactional(true)on theMongoDb.outboundGateway()in the end.