I am using Spring batch integration with Kafka for remote partitioning. Job was running with 1 manager and 2 workers suddenly all workers and manager server stopped/restarted and the job remain in STARTED/Running state, Then I manually updated the BATCH_JOB_EXECUTION and BATCH_STEP_EXECUTION so that I can start new instance of job and when I started new job then the workers/consumers also started reading previous/unread/lag messages from Kafka topic and data processing is duplicated. In that case how I can force when new instance of job starts it only pick latest produced partition messages only not the previous unread/lag messages of previous job instance from Kafka topic. I am using DirectChannel inboundFlowWorker.
public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
return IntegrationFlows
.from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("requestForWorkers")))
.channel(requestForWorkers())
.get();
}