Rolling back individual input files when sending after aggregation fails

64 Views Asked by At

I'm currently trying to migrate some legacy code/system to Camel. The current system is working with a lot of folders for queues and processes all messages in a these queues sequentially instead of processing the full flow for a message completely before starting to process the next message. We want to get rid of this and this seems possible with Camel (which I already new due to previous experience with Camel). There only is one 'problem' where I do not seem to be able to get simple flow definitions. In text in its simplified form the flow is like this

  • Poll a certain folder for files
  • Some files need to be aggregated some can be send out directly (currently based on filename, but could be content based to in the future)
  • Aggregate files based on a certain field in the content
  • Send files to a remote system

This works great in happy days scenario's. Next step was to add errorhandling. And for this we have one important requirement. Individual files from the input folder that might have been aggregated but were, after reties, not sent out, should as individual files end up in an error folder.

And here is the problem, at least in keeping the flows simple. The reason I mention 'at least in keeping the flows simple' is that with (a lot of) additional nodes and steps, I can achieve what I want (won't post this example since it is not what I want 'advice' about). The flow that I'm working with is this:

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:camel="http://camel.apache.org/schema/spring"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring https://camel.apache.org/schema/spring/camel-spring-3.4.0.xsd">

    <bean class="org.apache.camel.processor.aggregate.StringAggregationStrategy" id="myAggregationStrategy">
    
    <camelContext>
        <route>
            <from uri="file://c:/data/store/merge/?moveFailed=.error"/>
                           
            <setHeader headerName="MyGroupingID" id="_setHeader1">
                <simple resultType="int">${header.CamelFileName.split('_|\.')[0]}</simple>
            </setHeader>
            <aggregate  strategyRef="myAggregationStrategy">
                <correlationExpression>
                    <header>MyGroupingID</header>
                </correlationExpression>
                <completionTimeout>
                    <constant>5000</constant>
                </completionTimeout>
                <choice id="_choice1">
                    <when id="_when1">
                        <simple>${exchange.properties['CamelAggregatedCorrelationKey']} == 1</simple>
                        <throwException exceptionType="java.lang.Exception" id="_throwException1" message="Group 1 has an error"/>
                    </when>
                    <otherwise id="_otherwise1">
                        <log id="processMessage" message="Processed exchange: [${exchange}] body: [${body}] headers: [${headers}]."/>
                    </otherwise>
                </choice>
            </aggregate>
        </route>
    </contextx>
</beans>

What is happening when using an aggregator is that once an individual message enters the aggregator node and is processed, it is moved from the input folder to the, default, .camel folder (or whatever you specify). So I turned on trace level logging and started investigating why this happens. The actual moving is done by the GenericFileOnCompletion which in turn is seems to be regisgered as a Synchronization in the UnitOfWork and triggered when there is nothing more to do with the Exchange which is sort of true since the AggregateProcessor Seems to make a copy of the exchange.

What I've tried:

  • Adding an custom onCompletion to the route (or global) but it only fires for the original Exchanges
  • Adding an error handler but it only fires for the aggregated Exchange

I debugged some more and noticed that with the a custom aggregation strategy based on the GroupedExchangeAggregationStrategy I could have a new 'aggregated' body and the individual exchanges as a List<Exchange in a property in the aggregated exchange. I (think I) can use this list of copies of original exchanges in an errorHandler to parse each of these exchange and use the references to the original location and move the file from the .camel folder to a .error folder (both properties to be retrieved from the file endpoint (all this in a processor so the logic of this is hidden for our flow developers)

Other things I'm aware of:

  • I can create my own processor to do what the aggregator does but I'm not sure if I can keep the original exchanges 'alive'?
  • I possibly need some 'persistence' (like leveldb for aggregation) to be able to survive system crashes

Unit test: to be able to reproduce things

@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration({"/META-INF/spring/applicationContext.xml"})
@DisableJmx(true)
@UseAdviceWith(true)
public class MessageGroupingTest {

  private static final Logger log = LoggerFactory.getLogger(MessageGroupingTest.class);

  @Autowired
  private CamelContext camelContext;
  
  private final AtomicBoolean adviced = new AtomicBoolean(false);

  @Produce(uri = "file://c:/data/store/merge/")
  private ProducerTemplate producer;

  @EndpointInject(uri = "mock:semiStreamingGrouping")
  private MockEndpoint mock;

  @Before
  public void adviceGroupingRoute() throws Exception {
    if (!adviced.get()) {
        ModelCamelContext mcc = camelContext.adapt(ModelCamelContext.class);
        RouteReifier.adviceWith(mcc.getRouteDefinition("_route1"), mcc, new AdviceWithRouteBuilder() {
          @Override
        public void configure() throws Exception {
            weaveById("processMessage")
            .after()
            .to(mock);
            
        }
        });
      camelContext.start();
      adviced.set(true);
    }
  }
  
  @Test
  public void testGroupingMessages() throws Exception {
    String message = "Hello world!";
    
      Map<String, Object> headers = new HashMap<>();

      headers.put("MyGroupingID", 1);
  
      headers.put("CamelFileName", "1_2_1.txt");
      producer.sendBodyAndHeaders(message, headers);
      
      headers.put("CamelFileName", "1_2_2.txt");
      producer.sendBodyAndHeaders(message, headers);

      headers.put("CamelFileName", "2_1_1.txt");
      producer.sendBodyAndHeaders(message, headers);


    mock.expectedMessageCount(1);
    Thread.sleep(5000);
    mock.assertIsSatisfied(300000L);
  }
}

Using Camel 3.6.0 (but tried 2.25.2 as well with all related (back)changes, same issue so it is not some sort of regression ;-)

I'm not looking for a complete solution in code, hints about possibilities or impossibilities (unsolvable problems) would be find too (although if there is simple solution I would not mind being referred to it or have it as an answer ;-))

Indirectly related posts I read:

0

There are 0 best solutions below