Config to set messenger server not taking

18 Views Asked by At

I have a Quarkus 3.2 server that I am setting up to send messages to Kafka. I have a working implementation that is verified via unit test using KafkaCompanion.

However, when the service is deployed, it does not seem like the configuration to set the address of the server is taking properly; whenever a message is sent to the outgoing channel, I get the following error:

Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

I also do not see the topics being created on the kafka server.

This is with the following config set:

mp.messaging.outgoing.events-outgoing.bootstrap.servers=oqm-infra-kafka:9092
mp.messaging.outgoing.events-outgoing.connector=smallrye-kafka
mp.messaging.outgoing.events-outgoing.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

I have also added kafka.bootstrap.servers=oqm-infra-kafka:9092, with the same result.

Also tried prepending the host in the config with OUTSIDE:// as that was the config in the tests.

I have verified that the expected config is present in the running process by outputting the value from ConfigProvider.

Messenger code:

@Slf4j
@ApplicationScoped
public class HistoryEventNotificationService {
    
    public static final String INTERNAL_EVENT_CHANNEL = "events-internal";
    public static final String OUTGOING_EVENT_CHANNEL = "events-outgoing";
    public static final String ALL_EVENT_TOPIC = "all-events";
    
    @Inject
    @Broadcast
    @Channel(INTERNAL_EVENT_CHANNEL)
    @OnOverflow(value = OnOverflow.Strategy.DROP)//TODO:: this better https://quarkus.io/version/3.2/guides/kafka#sending-messages-with-emitter
    Emitter<EventNotificationWrapper> internalEventEmitter;
    
    @Inject
    @Broadcast
    @Channel(OUTGOING_EVENT_CHANNEL)
    @OnOverflow(value = OnOverflow.Strategy.DROP)
    Emitter<ObjectHistoryEvent> outgoingEventEmitter;
    
    /**
     * Don't call this directly, use the other one
     */
    @WithSpan
    @Incoming(INTERNAL_EVENT_CHANNEL)
    void sendEventOutgoing(EventNotificationWrapper notificationWrapper) {
        log.info("Sending event to external channels: {}/{}", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId());
//      log.info("Kafka server config: {}", ConfigProvider.getConfig().getValue("mp.messaging.outgoing.events-outgoing.bootstrap.servers", String.class));
        try {
            this.outgoingEventEmitter.send(
                Message.of(
                    notificationWrapper.getEvent()
                ).addMetadata(
                    OutgoingKafkaRecordMetadata.<String>builder()
                        .withTopic(notificationWrapper.getObjectName() + "-" + notificationWrapper.getEvent().getType())
                        .build()
                ));
            this.outgoingEventEmitter.send(
                Message.of(
                    notificationWrapper.getEvent()
                ).addMetadata(
                    OutgoingKafkaRecordMetadata.<String>builder()
                        .withTopic(ALL_EVENT_TOPIC)
                        .build()
                ));
            log.debug("Sent event to external channels: {}/{}", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId());
        } catch(Throwable e) {
            log.error("FAILED to send event to external channels: {}/{}:", notificationWrapper.getClass().getSimpleName(), notificationWrapper.getEvent().getId(), e);
            throw e;
        }
    }
    
    public void sendEvent(Class<?> objectClass, ObjectHistoryEvent event) {
        this.sendEvents(objectClass, event);
    }
    
    public void sendEvents(Class<?> objectClass, ObjectHistoryEvent... events) {
        this.sendEvents(objectClass, Arrays.asList(events));
    }
    
    public void sendEvents(Class<?> objectClass, Collection<ObjectHistoryEvent> events) {
        for (ObjectHistoryEvent event : events) {
            log.info("Sending event to internal channel: {}/{}", objectClass.getSimpleName(), event.getId());
            if (event.getId() == null) {
                throw new NullPointerException("Null ID for " + event.getType() + " event given for object of type " + objectClass.getSimpleName());
            }
            this.internalEventEmitter.send(new EventNotificationWrapper(objectClass.getSimpleName(), event));
        }
    }
    
}

What can I do to make Quarkus recognize the server config?

Edit: Interestingly, I found this snippet of log during the init process:

SRMSG18258: Kafka producer kafka-producer-events-outgoing, connected to Kafka brokers 'OUTSIDE://oqm-infra-kafka:9092', is configured to write records to 'events-outgoing'

This indicates that the config is in fact being picked up, but somehow when the connection is made, it goes back to trying to connect to localhost?

0

There are 0 best solutions below