Camel: How to retain aggregation result when exception occurs?

376 Views Asked by At

I have a route which multicast to 2 places. If a exception occurs when calling 1 of the place, I'm not able to retain the aggregation result. Inside the processor of the onException, the Map which I created during aggregating is not there. Im using camel 2.25.

onException(RuntimeException.class)
    .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            Map<String, String> results = exchange.getProperty(SimpleAggregationStrategy.RESULTS, Map.class);
            System.out.println(results);
        }
    });

from(DIRECT_FIRST)
    .log("First route")
    .setBody(constant("FIRST TEXT"));

from(DIRECT_SECOND)
    .log("Second route")
    .setBody(constant("SECOND TEXT"))
    .throwException(new RuntimeException("Dummy Exception"));

from(DIRECT_ENTRY)
    .multicast().stopOnException().aggregationStrategy(new AggregationStrategy() {
        public static final String RESULTS = "RESULTS";

        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            System.out.println("INSIDE SimpleAggregationStrategy !!!!!!!!!!!!!!!!");
            Map<String, String> results;
            if (oldExchange != null) {
                results = oldExchange.getProperty(RESULTS, Map.class);
            } else {
                results = new HashMap<>();
            }
            results.put(newExchange.getIn().getBody(String.class), newExchange.getIn().getBody(String.class));
            return newExchange;
        }
    })
    .to(DIRECT_FIRST, DIRECT_SECOND);
2

There are 2 best solutions below

2
On

The solution is simpler than I thought. We only need to create a exchangeProperty before the multicast step. Then that exchange property can store the aggregation results even in the case of exception while doing multicast

onException(RuntimeException.class)
    .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            Map<String, String> results = exchange.getProperty("RESULTS", Map.class);
            System.out.println(results);
        }
    });

from(DIRECT_FIRST)
    .log("First route")
    .setBody(constant("FIRST TEXT"));

from(DIRECT_SECOND)
    .log("Second route")
    .setBody(constant("SECOND TEXT"))
    .throwException(new RuntimeException("Dummy Exception"));

from(DIRECT_ENTRY)
    .process(exch -> {
        exch.setProperty("RESULTS", new HashMap<String, String>())
    })
    .multicast().stopOnException().aggregationStrategy(new AggregationStrategy() {
        public static final String RESULTS = "RESULTS";

        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            Map<String, String> results;
            if (oldExchange != null) {
                results = oldExchange.getProperty(RESULTS, Map.class);
            } else {
                results = new HashMap<>();
            }
            results.put(newExchange.getIn().getBody(String.class), newExchange.getIn().getBody(String.class));
            return newExchange;
        }
    })
    .to(DIRECT_FIRST, DIRECT_SECOND);
0
On

I assume that the aggregator aborts processing due to stopOnException() and therefore does not return the (incomplete) result.

You could try to put the aggregation strategy into a context managed bean and make the Map an instance variable that is accessible through a getter method.

In case of an exception you could try to get the incomplete Map from the bean. But I have no idea if it still holds the data or if it is emptied when the processing is aborted.