I am fairly new to Quarkus and Kafka and I have the following scenario.
I am writing a consumer to listen to a few topics (they are logging topics that log the various stages a quarkus message is going through in our systems eg: I log a msg once a message has been sent to the main kafka topic. Then every consumer for that message also logs a message to the logging topic once they read/successfully process that message). Not really relevant, but just for context.
I have in the past successfully written consumers in Quarkus that consume 2 or more topics successfully, but now this one is trowing the following message on startup.
2024-03-18 13:55:45,720 WARN [io.smallrye.reactive.messaging.provider] (Quarkus Main Thread) SRMSG00208: The connector 'IncomingConnector{channel:'curr-in', attribute:'mp.messaging.incoming.curr-in'}' has no downstreams
2024-03-18 13:55:45,723 WARN [io.smallrye.reactive.messaging.provider] (Quarkus Main Thread) SRMSG00207: Some components are not connected to either downstream consumers or upstream producers:
- IncomingConnector{channel:'curr-in', attribute:'mp.messaging.incoming.curr-in'} has no downstream
- IncomingConnector{channel:'stud-in', attribute:'mp.messaging.incoming.stud-in'} has no downstream
And then does not consume records from either topic.
My application.properties file looks as follows:
## Kafka student log properties
mp.messaging.incoming.stud-in.connector=smallrye-kafka
mp.messaging.incoming.stud-in.topic=academia-student-logging-topic
mp.messaging.incoming.stud-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.stud-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.stud-in.health-enabled=true
mp.messaging.incoming.stud-in.health-readiness-enabled=true
mp.messaging.incoming.stud-in.auto.offset.reset=earliest
mp.messaging.incoming.stud-in.enable.auto.commit=false
mp.messaging.incoming.stud-in.max.poll.records=1
mp.messaging.incoming.stud-in.client.id=KafkaLogConsumer
mp.messaging.incoming.stud-in.retry=true
mp.messaging.incoming.stud-in.retry-attempts=-1
mp.messaging.incoming.stud-in.retry-max-wait=20
## Kafka curriculum log properties
mp.messaging.incoming.curr-in.connector=smallrye-kafka
mp.messaging.incoming.curr-in.topic=academia-curriculum-logging-topic
mp.messaging.incoming.curr-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.curr-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.curr-in.health-enabled=true
mp.messaging.incoming.curr-in.health-readiness-enabled=true
mp.messaging.incoming.curr-in.auto.offset.reset=earliest
mp.messaging.incoming.curr-in.enable.auto.commit=false
mp.messaging.incoming.curr-in.max.poll.records=1
mp.messaging.incoming.curr-in.client.id=KafkaLogConsumer
mp.messaging.incoming.curr-in.retry=true
mp.messaging.incoming.curr-in.retry-attempts=-1
mp.messaging.incoming.curr-in.retry-max-wait=20
and my code looks as follows:
@Incoming("stud-in")
@Incoming("curr-in")
@Retry(delay = 120, delayUnit = ChronoUnit.SECONDS, maxRetries = -1, maxDuration = 300, durationUnit = ChronoUnit.SECONDS)
public void receive(ConsumerRecord<String, String> event) throws Exception {
Strangely enough if I take out one of the topics, it happily consumes the data, but the moment I put in 2 @Incoming tags it bombs out. And its quite frustraint as I have quite a few WORKING examples of this and I can't for the life of me spot any difference.
My pom looks as follows just for completeness sake:
<modelVersion>4.0.0</modelVersion>
<groupId>sun.kafka.readers</groupId>
<artifactId>integration-kafka-loglistener</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<compiler-plugin.version>3.10.1</compiler-plugin.version>
<maven.compiler.release>11</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>2.16.3.Final</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.0.0-M7</surefire-plugin.version>
</properties>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
The error you describe happens if the configuration is there but no matching annotations are found.
The reactive messaging annotations (@Incoming, @Outgoing etc.) are not picked if not used directly in a CDI bean. So this may happen if channel annotations are used inside a class inherited by a CDI bean.