How to build flink kafka producer with exactly once behavior?

160 Views Asked by At

I have a simple flink application which consumes alerts from one topic(kafka) and publishes to a new topic(kafka).

I have set the exactly once guarantee for data sink. But, my consumer does not consume that data when I set such guarantees over the sink.

Here are the versions - JVM version - jdk-11 Flink - 1.15.2

Producer -

        KafkaSink<String> mySink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("Msg_Offset_MGMG_Tx_2")
                .setKafkaProducerConfig(producerConfig)             
                .setRecordSerializer(new OffsetSerializer("MySink"))                
                .build();  

I tried passing below properties in producer -

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 600000);
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

Consumer -

Properties prop = new Properties();
        prop.put("commit.offsets.on.checkpoint", "true");
        prop.put("enable.auto.commit", "false");
        prop.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        KafkaSource<Long> dataSource = KafkaSource.<Long>builder()
                .setBootstrapServers(brokers)
                .setTopics("Alerts")
                .setGroupId("alertss-1")
                .setProperties(prop)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setClientIdPrefix("Alerts_01_temp_2")              
                .setDeserializer("mycustomSerializer")
                .build();

However, when I run this, my sink fails to push any record even though there are messages present in data source. But, when I remove below lines from data sink it works -

setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("Msg_Offset_MGMG_Tx_2")

Can anyone please tell me what I am doint wrong?

0

There are 0 best solutions below