Suspend and Resume Apache Camel routes

130 Views Asked by At

I have multiple messages which should be sent to a MQ 'foo'. After sending a message from 'route-1', the program should wait for a reply on MQ 'bar'. Only after getting a reply on queue 'bar', the next message should be sent.

Apache camel Documentation

I have followed the steps given in the documentation. All the messages are getting routed and then the route is getting suspended.

input.txt msg1,msg2,msg3,msg4,msg5

beans.xml

    <camelContext id="orc-1" xmlns="http://camel.apache.org/schema/spring">
        <route id="route-1">
            <from uri="file://D://camelInput?fileName=input.txt" />
            <split>
                <tokenize token="," />
                <log message="${body}" />
                <to uri="jms:queue:foo" />
                <process ref="postProcessor" />
            </split>
        </route>
        <route id="route-2">
            <from uri="jms:queue:bar" />
        <!--restart route-1-->
        </route>
    </camelContext>

PostProcessor.java

public class PostProcessor implements Processor {

    Thread stop;

    @Override
    public void process(Exchange exchange) throws Exception {
        // stop this route using a thread that will stop
        // this route gracefully while we are still running
        if (stop == null) {
            stop = new Thread() {
                @Override
                public void run() {
                    try {
                        exchange.getContext().getRouteController().suspendRoute("route-1");
                    } catch (Exception e) {
                        // ignore
                    }
                }
            };
        }
        // start the thread that stops this route
        stop.start();
    }
}

logs

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::               (v2.7.11)

2023-05-22 11:01:52.812  INFO 23484 --- [           main] com.persistent.camel.CamelApplication    : Starting CamelApplication using Java 14.0.2 on PSL-5Q47XM3 with PID 23484 (D:\camel\camel\target\classes started by shashank_mugatkar in D:\camel\camel)
2023-05-22 11:01:52.815  INFO 23484 --- [           main] com.persistent.camel.CamelApplication    : No active profile set, falling back to 1 default profile: "default"
2023-05-22 11:01:54.951  INFO 23484 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 3.20.3 (orc-1) is starting
2023-05-22 11:01:55.146  INFO 23484 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Routes startup (started:3)
2023-05-22 11:01:55.147  INFO 23484 --- [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started route-1 (file://D://camelInput)
2023-05-22 11:01:55.147  INFO 23484 --- [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started route-2 (jms://queue:bar)
2023-05-22 11:01:55.147  INFO 23484 --- [           main] o.a.c.impl.engine.AbstractCamelContext   :     Started route-3 (direct://suspendRoute1)
2023-05-22 11:01:55.147  INFO 23484 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 3.20.3 (orc-1) started in 453ms (build:64ms init:195ms start:194ms)
2023-05-22 11:01:55.157  INFO 23484 --- [           main] com.persistent.camel.CamelApplication    : Started CamelApplication in 2.742 seconds (JVM running for 3.157)
2023-05-22 11:01:56.007  INFO 23484 --- [/D://camelInput] route-1                                  : msg0
2023-05-22 11:01:56.035  INFO 23484 --- [/D://camelInput] route-1                                  : msg1
2023-05-22 11:01:56.040  INFO 23484 --- [ - ShutdownTask] o.a.c.i.engine.DefaultShutdownStrategy   : Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 45 seconds. Inflights per route: [route-1 = 2]
2023-05-22 11:01:56.050 ERROR 23484 --- [/D://camelInput] o.a.c.p.e.DefaultErrorHandler            : Failed delivery for (MessageId: 23D21B5DF3A3295-0000000000000001 on ExchangeId: 23D21B5DF3A3295-0000000000000001). Exhausted after delivery attempt: 1 caught: java.lang.IllegalThreadStateException

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             Processor                                          Elapsed (ms)
                                         route-1/route-1                from[file://D://camelInput?fileName=input.txt]          2539704
    ...
                                         route-1/process1               ref:postProcessor                                             0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalThreadStateException: null
    at java.base/java.lang.Thread.start(Thread.java:792) ~[na:na]
    at com.persistent.camel.PostProcessor.process(PostProcessor.java:27) ~[classes/:na]
    at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[camel-support-3.20.3.jar:3.20.3]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[camel-core-processor-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.20.3.jar:3.20.3]
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.20.3.jar:3.20.3]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2023-05-22 11:01:56.051  INFO 23484 --- [/D://camelInput] route-1                                  : msg2
2023-05-22 11:01:56.061 ERROR 23484 --- [/D://camelInput] o.a.c.p.e.DefaultErrorHandler            : Failed delivery for (MessageId: 23D21B5DF3A3295-0000000000000002 on ExchangeId: 23D21B5DF3A3295-0000000000000002). Exhausted after delivery attempt: 1 caught: java.lang.IllegalThreadStateException

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             Processor                                          Elapsed (ms)
                                         route-1/route-1                from[file://D://camelInput?fileName=input.txt]          2539720
    ...
                                         route-1/process1               ref:postProcessor                                             0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalThreadStateException: null
    at java.base/java.lang.Thread.start(Thread.java:792) ~[na:na]
    at com.persistent.camel.PostProcessor.process(PostProcessor.java:27) ~[classes/:na]
    at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[camel-support-3.20.3.jar:3.20.3]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[camel-core-processor-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.20.3.jar:3.20.3]
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.20.3.jar:3.20.3]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2023-05-22 11:01:56.062  INFO 23484 --- [/D://camelInput] route-1                                  : msg3
2023-05-22 11:01:56.072 ERROR 23484 --- [/D://camelInput] o.a.c.p.e.DefaultErrorHandler            : Failed delivery for (MessageId: 23D21B5DF3A3295-0000000000000003 on ExchangeId: 23D21B5DF3A3295-0000000000000003). Exhausted after delivery attempt: 1 caught: java.lang.IllegalThreadStateException

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             Processor                                          Elapsed (ms)
                                         route-1/route-1                from[file://D://camelInput?fileName=input.txt]          2539730
    ...
                                         route-1/process1               ref:postProcessor                                             0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalThreadStateException: null
    at java.base/java.lang.Thread.start(Thread.java:792) ~[na:na]
    at com.persistent.camel.PostProcessor.process(PostProcessor.java:27) ~[classes/:na]
    at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[camel-support-3.20.3.jar:3.20.3]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[camel-core-processor-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.20.3.jar:3.20.3]
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.20.3.jar:3.20.3]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2023-05-22 11:01:56.072  INFO 23484 --- [/D://camelInput] route-1                                  : msg4

2023-05-22 11:01:56.083 ERROR 23484 --- [/D://camelInput] o.a.c.p.e.DefaultErrorHandler            : Failed delivery for (MessageId: 23D21B5DF3A3295-0000000000000004 on ExchangeId: 23D21B5DF3A3295-0000000000000004). Exhausted after delivery attempt: 1 caught: java.lang.IllegalThreadStateException

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             Processor                                          Elapsed (ms)
                                         route-1/route-1                from[file://D://camelInput?fileName=input.txt]          2539741
    ...
                                         route-1/process1               ref:postProcessor                                             0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalThreadStateException: null
    at java.base/java.lang.Thread.start(Thread.java:792) ~[na:na]
    at com.persistent.camel.PostProcessor.process(PostProcessor.java:27) ~[classes/:na]
    at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[camel-support-3.20.3.jar:3.20.3]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[camel-core-processor-3.20.3.jar:3.20.3]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.20.3.jar:3.20.3]
    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.20.3.jar:3.20.3]
    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.20.3.jar:3.20.3]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

2023-05-22 11:01:56.083  WARN 23484 --- [/D://camelInput] o.a.c.c.file.GenericFileOnCompletion     : Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@4436c7b8 for file: GenericFile[D:\camelInput\input.txt]
2023-05-22 11:01:57.047  INFO 23484 --- [       Thread-3] o.a.c.impl.engine.AbstractCamelContext   : Suspended route-1 (file://D://camelInput)

0

There are 0 best solutions below