Spring Batch remote partitioning with kafka

61 Views Asked by At

I am using Spring batch integration with Kafka for remote partitioning. Job was running with 1 manager and 2 workers suddenly all workers and manager server stopped/restarted and the job remain in STARTED/Running state, Then I manually updated the BATCH_JOB_EXECUTION and BATCH_STEP_EXECUTION so that I can start new instance of job and when I started new job then the workers/consumers also started reading previous/unread/lag messages from Kafka topic and data processing is duplicated. In that case how I can force when new instance of job starts it only pick latest produced partition messages only not the previous unread/lag messages of previous job instance from Kafka topic. I am using DirectChannel inboundFlowWorker.

    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory) {
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties("requestForWorkers")))
                .channel(requestForWorkers())
                .get();
    }
0

There are 0 best solutions below