here is the configuration of my consumer
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
input:
destination: greeting
content-type: application/json
kafka:
binder:
brokers: kafka
zkNodes: zookeeper
the code of my app
@SpringBootApplication
@EnableIntegration
@EnableBinding(CommandSink.class)
public class KafkaTesterApplication {
private static Logger logger = LogManager.getLogger(KafkaTesterApplication.class);
/**
* @param args
*/
public static void main(String[] args) {
SpringApplication.run(KafkaTesterApplication.class, args);
}
@ServiceActivator(inputChannel="input")
public void receiveMessage(String message) {
logger.debug("receive {}", message);
}
}
and the sink interface
public interface CommandSink {
public static final String CHANNEL = "input";
@Input(CommandSink.CHANNEL)
SubscribableChannel command();
}
it looks like consumer doesn't connect to zookeeper and kafka. any idea?
Ok, we found the solution...
We don't know why but a topic was missing. The most curious in the problem was that the consumer with zookeeper (old style) can consume messages.
The missing topic was