Embedded Kafka not showing consumer offset

1.3k Views Asked by At

Got an embedded kafka instance running as part of a test. I'm trying to verify that all the messages have been read but am getting an empty result from the kafka admin client.

Map<TopicPartition, OffsetAndMetadata> partitionOffset = embeddedKafkaRule.getEmbeddedKafka().doWithAdminFunction(admin -> {
        try{
            return admin.listConsumerGroupOffsets(COUNTER_GROUP).partitionsToOffsetAndMetadata().get();
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    });

The map is always empty. I've tried setting ack all and having 100ms autoOffsetCommit with a wait to see if this makes any difference but no luck.

 System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaRule.getEmbeddedKafka()
            .getBrokersAsString());
 System.setProperty("spring.cloud.stream.bindings.enricher-in-0.destination", COUNTER_TOPIC);
 System.setProperty("spring.cloud.stream.bindings.enricher-in-0.group", COUNTER_GROUP);
 System.setProperty("spring.cloud.stream.bindings.enricher-out-0.destination", ENRICHED_COUNTER_TOPIC);
 System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.consumer.ackEachRecord", "true");
 System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.autoCommitOffset", "true");       System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms", "100");
1

There are 1 best solutions below

2
On

When are you setting those system properties? Are you sure the binding is using the embedded broker?

This works fine for me.

@SpringBootApplication
public class So65329718Application {

    public static void main(String[] args) {
        SpringApplication.run(So65329718Application.class, args);
    }

    @Bean
    Consumer<String> consume() {
        return System.out::println;
    }

}
spring.cloud.stream.bindings.consume-in-0.group=theGroup
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So65329718ApplicationTests {

    @Autowired
    KafkaTemplate<byte[], byte[]> template;

    @Autowired
    EmbeddedKafkaBroker broker;

    @Test
    void test() throws InterruptedException {
        this.template.send("consume-in-0", "foo".getBytes());
        Thread.sleep(10_000);
        this.broker.doWithAdmin(admin -> {
            try {
                System.out.println(admin.listConsumerGroupOffsets("theGroup").partitionsToOffsetAndMetadata()
                        .get(10, TimeUnit.SECONDS));
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }
        });
    }

}
foo
...
{consume-in-0-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}

(The binder uses spring.kafka.bootstrap-servers if there is no binder-specific property).