I have multiple messages which should be sent to a MQ 'foo'. After sending a message from 'route-1', the program should wait for a reply on MQ 'bar'. Only after getting a reply on queue 'bar', the next message should be sent.
I have followed the steps given in the documentation. All the messages are getting routed and then the route is getting suspended.
input.txt
msg1,msg2,msg3,msg4,msg5
beans.xml
<camelContext id="orc-1" xmlns="http://camel.apache.org/schema/spring">
<route id="route-1">
<from uri="file://D://camelInput?fileName=input.txt" />
<split>
<tokenize token="," />
<log message="${body}" />
<to uri="jms:queue:foo" />
<process ref="postProcessor" />
</split>
</route>
<route id="route-2">
<from uri="jms:queue:bar" />
<!--restart route-1-->
</route>
</camelContext>
PostProcessor.java
public class PostProcessor implements Processor {
Thread stop;
@Override
public void process(Exchange exchange) throws Exception {
// stop this route using a thread that will stop
// this route gracefully while we are still running
if (stop == null) {
stop = new Thread() {
@Override
public void run() {
try {
exchange.getContext().getRouteController().suspendRoute("route-1");
} catch (Exception e) {
// ignore
}
}
};
}
// start the thread that stops this route
stop.start();
}
}
logs
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.7.11)
2023-05-22 11:01:52.812 INFO 23484 --- [ main] com.persistent.camel.CamelApplication : Starting CamelApplication using Java 14.0.2 on PSL-5Q47XM3 with PID 23484 (D:\camel\camel\target\classes started by shashank_mugatkar in D:\camel\camel)
2023-05-22 11:01:52.815 INFO 23484 --- [ main] com.persistent.camel.CamelApplication : No active profile set, falling back to 1 default profile: "default"
2023-05-22 11:01:54.951 INFO 23484 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.20.3 (orc-1) is starting
2023-05-22 11:01:55.146 INFO 23484 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Routes startup (started:3)
2023-05-22 11:01:55.147 INFO 23484 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Started route-1 (file://D://camelInput)
2023-05-22 11:01:55.147 INFO 23484 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Started route-2 (jms://queue:bar)
2023-05-22 11:01:55.147 INFO 23484 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Started route-3 (direct://suspendRoute1)
2023-05-22 11:01:55.147 INFO 23484 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.20.3 (orc-1) started in 453ms (build:64ms init:195ms start:194ms)
2023-05-22 11:01:55.157 INFO 23484 --- [ main] com.persistent.camel.CamelApplication : Started CamelApplication in 2.742 seconds (JVM running for 3.157)
2023-05-22 11:01:56.007 INFO 23484 --- [/D://camelInput] route-1 : msg0
2023-05-22 11:01:56.035 INFO 23484 --- [/D://camelInput] route-1 : msg1
2023-05-22 11:01:56.040 INFO 23484 --- [ - ShutdownTask] o.a.c.i.engine.DefaultShutdownStrategy : Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 45 seconds. Inflights per route: [route-1 = 2]
2023-05-22 11:01:56.050 ERROR 23484 --- [/D://camelInput] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: 23D21B5DF3A3295-0000000000000001 on ExchangeId: 23D21B5DF3A3295-0000000000000001). Exhausted after delivery attempt: 1 caught: java.lang.IllegalThreadStateException
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
route-1/route-1 from[file://D://camelInput?fileName=input.txt] 2539704
...
route-1/process1 ref:postProcessor 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalThreadStateException: null
at java.base/java.lang.Thread.start(Thread.java:792) ~[na:na]
at com.persistent.camel.PostProcessor.process(PostProcessor.java:27) ~[classes/:na]
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[camel-support-3.20.3.jar:3.20.3]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[camel-core-processor-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.20.3.jar:3.20.3]
at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.20.3.jar:3.20.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2023-05-22 11:01:56.051 INFO 23484 --- [/D://camelInput] route-1 : msg2
2023-05-22 11:01:56.061 ERROR 23484 --- [/D://camelInput] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: 23D21B5DF3A3295-0000000000000002 on ExchangeId: 23D21B5DF3A3295-0000000000000002). Exhausted after delivery attempt: 1 caught: java.lang.IllegalThreadStateException
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
route-1/route-1 from[file://D://camelInput?fileName=input.txt] 2539720
...
route-1/process1 ref:postProcessor 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalThreadStateException: null
at java.base/java.lang.Thread.start(Thread.java:792) ~[na:na]
at com.persistent.camel.PostProcessor.process(PostProcessor.java:27) ~[classes/:na]
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[camel-support-3.20.3.jar:3.20.3]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[camel-core-processor-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.20.3.jar:3.20.3]
at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.20.3.jar:3.20.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2023-05-22 11:01:56.062 INFO 23484 --- [/D://camelInput] route-1 : msg3
2023-05-22 11:01:56.072 ERROR 23484 --- [/D://camelInput] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: 23D21B5DF3A3295-0000000000000003 on ExchangeId: 23D21B5DF3A3295-0000000000000003). Exhausted after delivery attempt: 1 caught: java.lang.IllegalThreadStateException
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
route-1/route-1 from[file://D://camelInput?fileName=input.txt] 2539730
...
route-1/process1 ref:postProcessor 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalThreadStateException: null
at java.base/java.lang.Thread.start(Thread.java:792) ~[na:na]
at com.persistent.camel.PostProcessor.process(PostProcessor.java:27) ~[classes/:na]
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[camel-support-3.20.3.jar:3.20.3]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[camel-core-processor-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.20.3.jar:3.20.3]
at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.20.3.jar:3.20.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2023-05-22 11:01:56.072 INFO 23484 --- [/D://camelInput] route-1 : msg4
2023-05-22 11:01:56.083 ERROR 23484 --- [/D://camelInput] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: 23D21B5DF3A3295-0000000000000004 on ExchangeId: 23D21B5DF3A3295-0000000000000004). Exhausted after delivery attempt: 1 caught: java.lang.IllegalThreadStateException
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
route-1/route-1 from[file://D://camelInput?fileName=input.txt] 2539741
...
route-1/process1 ref:postProcessor 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalThreadStateException: null
at java.base/java.lang.Thread.start(Thread.java:792) ~[na:na]
at com.persistent.camel.PostProcessor.process(PostProcessor.java:27) ~[classes/:na]
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[camel-support-3.20.3.jar:3.20.3]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[camel-core-processor-3.20.3.jar:3.20.3]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.20.3.jar:3.20.3]
at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.20.3.jar:3.20.3]
at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.20.3.jar:3.20.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2023-05-22 11:01:56.083 WARN 23484 --- [/D://camelInput] o.a.c.c.file.GenericFileOnCompletion : Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@4436c7b8 for file: GenericFile[D:\camelInput\input.txt]
2023-05-22 11:01:57.047 INFO 23484 --- [ Thread-3] o.a.c.impl.engine.AbstractCamelContext : Suspended route-1 (file://D://camelInput)