We are in the process of implementing a micro service that at certain times of the day based on some cron scheduler goes and picks up a batch of records from a database table does some processing and save processing results back in the database and emits some events to Kafka. The micro service is build with Apache Camel. Camel quartz, Camel JPA and Camel Kafka are the Camel components we use.
The route looks similar with below:
from("quartz://database/message-retry-scheduler?cron=0/5 * * * * ?")
.autostartup(true)
.routeId("message-retry-route")
.transacted()
.choice()
.when(retryCheckPredicate)
.log("Message retry scheduler is running and will check for retryable messages.")
.to("jpa:" + Retryable.class.getCanonicalName()
+ "?nativeQuery=SELECT * FROM my_retryable WHERE is_retryable ORDER BY event_time ASC FOR UPDATE SKIP LOCKED"
+ "&consumeDelete=false"
+ "&maximumResults=10")
.split(body()).streaming()
.setHeader("reprocessedMessage", () -> true)
.process(myProcessor)
.log("Started reprocessing message with correlationId=${header.correlationId}")
.to("direct:process-meeesage)
.endChoice()
.end();
from("direct:process-meeesage)
.routeId("message-processing-subroute)
// More processing happening here
// including saving to the database all created entities.
As you can see from the above the above route is a retry to reprocess mechanism. We introduced this because we noticed that when our Dev Ops are patching the Kafka servers the system sometimes fails to publish the events we create and when this happens we save the incoming messages into the database and try to process them later on.
The message-processing-subroute is the part that is running from both main normal processing route as well as from this reprocessing route.
The FOR UPDATE SKIP LOCKED syntax is needed because we have multiple instances of the same micro service running and we want to prevent more than one instance picking up the same message from reprocessing.
We can see in application logs that the processing is happening as expected to completion but the route never completes and processing hangs forever. When suspending the execution in intelij debugger we found out that it hangs while trying to commit the transaction.
Changing the query to only pick up a message rather than a batch of ten and taking out from the route definition .split(body()).streaming() makes the route run OK and processing completes.
We tried to to take away streaming() and we added separately as well as together shareUnitOfWork(true) and synchronous() after split(body()). Nothing worked.
I am not exactly sure what is causing this and how to fix it. Processing one message at a time would solve the issue but we'll like to understand where the mistake is and of course would prefer to process a batch of messages in one go rather than one by one.
Thank you in advance for your inputs.