Flink kafka consumer group.id not works

223 Views Asked by At

I use apache fink with FlinkKafkaConsumer, I have 2 consumer read 1 topic(1partition). At one time, both consumers read the same event. I expect consumer1 receive event1, consumer2 can not receive event1. I used group.id but it not works. I refer to Flink's document, group.id is not support this.

Example: topic (e1,e2,e3,e4,e5)

result :

consumer 1: e1,e2,e3,e4,e5

consumer 2: e1,e2,e3,e4,e5

expect:

consumer1 : e1,e3

consumer2: e2,e4,e5

This is my code

String kafkaBootstrapServers = config.get(SOURCE_KAFKA_BOOTSTRAP_SERVERS);
String kafkaGroupId = config.get(SOURCE_KAFKA_GROUPID);
String topic = config.get(SOURCE_KAFKA_TOPIC);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty("group.id", kafkaGroupId);

properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
properties.setProperty("ssl.truststore.location", config.get(SOURCE_KAFKA_TRUSTSTORE_LOCATION));
properties.setProperty("ssl.truststore.password", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS));
properties.put("sasl.jaas.config",
       String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkascram\" password=\"%s\";", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS)));

FlinkKafkaConsumer<EventBase> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleSyslogEventSchema(), properties);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
1

There are 1 best solutions below

4
On

The FlinkKafkaConsumer was deprecated in Flink 1.14 and replaced with the newer KafkaSource API, which you'd likely want to use instead of the older, deprecated API. A similar implementation of the same consumer would look something like this:

KafkaSource.builder<EventBase>()
    .setBootstrapServers(kafkaBootstrapServers)
    .setTopics(topic)
    .setGroupId(kafkaGroupId)
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(SimpleSyslogEventSchema())
    // Add your SASL Configuration here
    .setProperty("security.protocol", "SASL_SSL")
    .setProperty("sasl.mechanism", "SCRAM-SHA-512") 
    .setProperty("ssl.endpoint.identification.algorithm", "")
    .setProperty("ssl.truststore.location", config.get(SOURCE_KAFKA_TRUSTSTORE_LOCATION)). 
    .setProperty("ssl.truststore.password", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS))
    .setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkascram\" password=\"%s\";", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS)));
    .build();

Configuration-wise, your existing group.id looks correct for the older FlinkKafkaConsumer API, however it's important to note that the value for this must be the same (preferably some constant for the job) to ensure that the consumer-group offsets are shared as you would expect.