Multiple topologies in one Quarkus based Kafka Streams application

213 Views Asked by At

I am trying to build Kafka Streams app with Quarkus. I was wondering can I some how have multiple topologies in same Quarkus app?

I was trying achieve it like this:

@ApplicationScoped
public class TestToplogy {

    @Produces
    public Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        // ...
        return builder.build();
    }

    @Produces
    public Topology buildTopology2() {
        final StreamsBuilder builder = new StreamsBuilder();
        // ...
        return builder.build();
    }
}

Or by using multiple @ApplicationScoped classes in same project:

@ApplicationScoped
public class TestToplogy1 {

    @Produces
    public Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        // ...
        return builder.build();
    }
}
@ApplicationScoped
public class TestToplogy2 {

    @Produces
    public Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        // ...
        return builder.build();
    }
}

Both approaches end up in exception jakarta.enterprise.inject.AmbiguousResolutionException because Quarkus Kafka Streams expects only one Topology to get resolved by injection.

Is there anyway to get multiple topologies up and running in one Quarkus application (jar)?

1

There are 1 best solutions below

0
On

The only method that worked for me so far is the older approach described on Quarkus blog, which allows to create and configure multiple instances of Kafka Streams.

It has the advantage of allowing to apply different configurations to Kafka Streams instances, at the cost of:

  • a bit more boilerplate code and lifecycle management,
  • having to inject the configuration manually (in place of the hardcoded values shown in the example below),
  • weaker integration into the rest of Quarkus framework (uncertain of the actual impact).
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

@ApplicationScoped
public class TopologyBuilderA {

    private KafkaStreams kafkaStreams;

    void buildTopologyA(@Observes StartupEvent startupEvent) {

        Properties properties = new Properties();
        // TODO: populate actual values from externalized configuration
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-topology-a");


        StreamsBuilder builder = new StreamsBuilder();
        // builder.stream(...)

        kafkaStreams = new KafkaStreams(builder.build(), properties);

        kafkaStreams.start();
    }

    void stopStream(@Observes ShutdownEvent shutdownEvent) {
        streams.close();
    }
}
// ...and repeat for TopologyBuildeerB.

NOTE that the application.id (my-app-topology-a in the example above) needs to differ between individual topologies, otherwise they won't be able to correctly register with the broker.


Following the comment under the original question, I tried applying custom @Qualifier-annotated annotations, but whilst these eliminated the exception, my @Produces-annotated methods were not being invoked. Attempted with Quarkus 3.5.1.


This has now been reported as an issue in Quarkus repo.