Got an embedded kafka instance running as part of a test. I'm trying to verify that all the messages have been read but am getting an empty result from the kafka admin client.
Map<TopicPartition, OffsetAndMetadata> partitionOffset = embeddedKafkaRule.getEmbeddedKafka().doWithAdminFunction(admin -> {
        try{
            return admin.listConsumerGroupOffsets(COUNTER_GROUP).partitionsToOffsetAndMetadata().get();
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    });
The map is always empty. I've tried setting ack all and having 100ms autoOffsetCommit with a wait to see if this makes any difference but no luck.
 System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaRule.getEmbeddedKafka()
            .getBrokersAsString());
 System.setProperty("spring.cloud.stream.bindings.enricher-in-0.destination", COUNTER_TOPIC);
 System.setProperty("spring.cloud.stream.bindings.enricher-in-0.group", COUNTER_GROUP);
 System.setProperty("spring.cloud.stream.bindings.enricher-out-0.destination", ENRICHED_COUNTER_TOPIC);
 System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.consumer.ackEachRecord", "true");
 System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.autoCommitOffset", "true");       System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms", "100");
 
                        
When are you setting those system properties? Are you sure the binding is using the embedded broker?
This works fine for me.
(The binder uses
spring.kafka.bootstrap-serversif there is no binder-specific property).