Got an embedded kafka instance running as part of a test. I'm trying to verify that all the messages have been read but am getting an empty result from the kafka admin client.
Map<TopicPartition, OffsetAndMetadata> partitionOffset = embeddedKafkaRule.getEmbeddedKafka().doWithAdminFunction(admin -> {
try{
return admin.listConsumerGroupOffsets(COUNTER_GROUP).partitionsToOffsetAndMetadata().get();
}catch (Exception e){
throw new RuntimeException(e);
}
});
The map is always empty. I've tried setting ack all and having 100ms autoOffsetCommit with a wait to see if this makes any difference but no luck.
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaRule.getEmbeddedKafka()
.getBrokersAsString());
System.setProperty("spring.cloud.stream.bindings.enricher-in-0.destination", COUNTER_TOPIC);
System.setProperty("spring.cloud.stream.bindings.enricher-in-0.group", COUNTER_GROUP);
System.setProperty("spring.cloud.stream.bindings.enricher-out-0.destination", ENRICHED_COUNTER_TOPIC);
System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.consumer.ackEachRecord", "true");
System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.autoCommitOffset", "true"); System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms", "100");
When are you setting those system properties? Are you sure the binding is using the embedded broker?
This works fine for me.
(The binder uses
spring.kafka.bootstrap-servers
if there is no binder-specific property).