How to make batch processing with Apex?

266 Views Asked by At

How can I create batch processing application with Apache Apex?

All the examples I've found were streaming applications, which means they are not ending and I would like my app to close once it has processed all the data.

Thanks

2

There are 2 best solutions below

1
Scorpio On

You can add an exit condition before running the app. for example

public void testMapOperator() throws Exception
{
   LocalMode lma = LocalMode.newInstance();
   DAG dag = lma.getDAG();

   NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
   FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
    = dag.addOperator("mapper", new  FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
   ResultCollector collector = dag.addOperator("collector", new ResultCollector());

   dag.addStream("raw numbers", numGen.output, mapper.input);
   dag.addStream("mapped results", mapper.output, collector.input);

// Create local cluster
   LocalMode.Controller lc = lma.getController();
   lc.setHeartbeatMonitoringEnabled(false);

 //Condition to exit the application
  ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
  {
     @Override
     public Boolean call() throws Exception
    {
       return TupleCount == NumTuples;
    }
  });

  lc.run();

  Assert.assertEquals(sum, 285);
}

for the complete code refer https://github.com/apache/apex-malhar/blob/master/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java

2
Sandeep Deshmukh On

What is your use-case? Supporting batch natively is on the roadmap and is being worked on right now.

Alternately, till then, once you are sure that your processing is done, the input operator can send a signal as ShutdownException() and that will propogate through the DAG and shutdown the DAG.

Let us know if you need further details.