I have a Quarkus 3.2 server that I am setting up to send messages to Kafka. I have a working implementation that is verified via unit test using KafkaCompanion
.
However, when the service is deployed, it does not seem like the configuration to set the address of the server is taking properly; whenever a message is sent to the outgoing channel, I get the following error:
Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
I also do not see the topics being created on the kafka server.
This is with the following config set:
mp.messaging.outgoing.events-outgoing.bootstrap.servers=oqm-infra-kafka:9092
mp.messaging.outgoing.events-outgoing.connector=smallrye-kafka
mp.messaging.outgoing.events-outgoing.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
I have also added kafka.bootstrap.servers=oqm-infra-kafka:9092
, with the same result.
Also tried prepending the host in the config with OUTSIDE://
as that was the config in the tests.
I have verified that the expected config is present in the running process by outputting the value from ConfigProvider.
Messenger code:
@Slf4j
@ApplicationScoped
public class HistoryEventNotificationService {
public static final String INTERNAL_EVENT_CHANNEL = "events-internal";
public static final String OUTGOING_EVENT_CHANNEL = "events-outgoing";
public static final String ALL_EVENT_TOPIC = "all-events";
@Inject
@Broadcast
@Channel(INTERNAL_EVENT_CHANNEL)
@OnOverflow(value = OnOverflow.Strategy.DROP)//TODO:: this better https://quarkus.io/version/3.2/guides/kafka#sending-messages-with-emitter
Emitter<EventNotificationWrapper> internalEventEmitter;
@Inject
@Broadcast
@Channel(OUTGOING_EVENT_CHANNEL)
@OnOverflow(value = OnOverflow.Strategy.DROP)
Emitter<ObjectHistoryEvent> outgoingEventEmitter;
/**
* Don't call this directly, use the other one
*/
@WithSpan
@Incoming(INTERNAL_EVENT_CHANNEL)
void sendEventOutgoing(EventNotificationWrapper notificationWrapper) {
log.info("Sending event to external channels: {}/{}", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId());
// log.info("Kafka server config: {}", ConfigProvider.getConfig().getValue("mp.messaging.outgoing.events-outgoing.bootstrap.servers", String.class));
try {
this.outgoingEventEmitter.send(
Message.of(
notificationWrapper.getEvent()
).addMetadata(
OutgoingKafkaRecordMetadata.<String>builder()
.withTopic(notificationWrapper.getObjectName() + "-" + notificationWrapper.getEvent().getType())
.build()
));
this.outgoingEventEmitter.send(
Message.of(
notificationWrapper.getEvent()
).addMetadata(
OutgoingKafkaRecordMetadata.<String>builder()
.withTopic(ALL_EVENT_TOPIC)
.build()
));
log.debug("Sent event to external channels: {}/{}", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId());
} catch(Throwable e) {
log.error("FAILED to send event to external channels: {}/{}:", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId(), e);
throw e;
}
}
public void sendEvent(Class<?> objectClass, ObjectHistoryEvent event) {
this.sendEvents(objectClass, event);
}
public void sendEvents(Class<?> objectClass, ObjectHistoryEvent... events) {
this.sendEvents(objectClass, Arrays.asList(events));
}
public void sendEvents(Class<?> objectClass, Collection<ObjectHistoryEvent> events) {
for (ObjectHistoryEvent event : events) {
log.info("Sending event to internal channel: {}/{}", objectClass.getSimpleName(), event.getId());
if (event.getId() == null) {
throw new NullPointerException("Null ID for " + event.getType() + " event given for object of type " + objectClass.getSimpleName());
}
this.internalEventEmitter.send(new EventNotificationWrapper(objectClass.getSimpleName(), event));
}
}
}
What can I do to make Quarkus recognize the server config?
Edit: Interestingly, I found this snippet of log during the init process:
SRMSG18258: Kafka producer kafka-producer-events-outgoing, connected to Kafka brokers 'OUTSIDE://oqm-infra-kafka:9092', is configured to write records to 'events-outgoing'
This indicates that the config is in fact being picked up, but somehow when the connection is made, it goes back to trying to connect to localhost?