I want to consume two bus service topics

133 Views Asked by At

I have two topics in the Bus service with 2 subscriptions each and from the same project I want to consume all four. I tried to consume the topics separately and it works well, I consume both subscriptions of the topic, both of one and the other, the problem is to do both at the same time. It gives me this error: 'fullyQualifiedNamespace' can't be null. But even if I put this with the correct namespaces, it overwrites it to null anyway

cloud: azure: servicebus: topic1: bindings: consumeSuscripcionUno-in-0: group: topic1 consumeSuscripcionDos-in-0: group: topic1 connection-string: "connection-string" enabled: true topic2: bindings: consumeSuscripcionUno-in-0: group: topic2 connection-string: "connection-string" enabled: true

esta es una parte de la config donde intento configurarlo

1

There are 1 best solutions below

0
Dasari Kamali On

I tried the below Java code to receive messages from two topics with two subscriptions from each.

Code :

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;

public class TopicConsumer {
    public static void main(String[] args) {

        String connection1 = "<topic1_connec_String>";
        String topic1 = "topic1";
        String subscription1 = "<topic1_subsrciptionName>";

        String connection2 = "<topic2_connec_String>";
        String topic2 = "topic2";
        String subscription2 = "<topic2_subsrciptionName>";

        String connection3 = "<topic1_connec_String>";
        String subscription3 = "<topic1_subsrciptionName>";

        String connection4 = "<topic2_connec_String>";
        String subscription4 = "<topic2_subsrciptionName>";

        ServiceBusProcessorClient processor1 = createProcessor(connection1, topic1, subscription1);
        ServiceBusProcessorClient processor2 = createProcessor(connection2, topic2, subscription2);

        ServiceBusProcessorClient processor3 = createProcessor(connection3, topic1, subscription3);
        ServiceBusProcessorClient processor4 = createProcessor(connection4, topic2, subscription4);

        startProcessor(processor1, "Topic 1, Subscription 1");
        startProcessor(processor2, "Topic 2, Subscription 2");
        startProcessor(processor3, "Topic 1, Subscription 3");
        startProcessor(processor4, "Topic 2, Subscription 4");

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            stopProcessor(processor1, "Topic 1, Subscription 1");
            stopProcessor(processor2, "Topic 2, Subscription 2");
            stopProcessor(processor3, "Topic 1, Subscription 3");
            stopProcessor(processor4, "Topic 2, Subscription 4");
        }));
    }

    private static ServiceBusProcessorClient createProcessor(String connection, String topic, String subscription) {
        return new ServiceBusClientBuilder()
                .connectionString(connection)
                .processor()
                .topicName(topic)
                .subscriptionName(subscription)
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .processMessage(context -> {
                    System.out.printf("Received message from %s, %s: %s%n", topic, subscription, context.getMessage().getBody().toString());
                    context.complete();
                })
                .processError(context -> {
                    System.err.printf("Error occurred: %s%n", context.getException());
                })
                .buildProcessorClient();
    }

    private static void startProcessor(ServiceBusProcessorClient processor, String processorName) {
        System.out.println("Starting the processor for " + processorName + "...");
        processor.start();
    }

    private static void stopProcessor(ServiceBusProcessorClient processor, String processorName) {
        System.out.println("Stopping the processor for " + processorName + "...");
        processor.stop();
    }
}

pom.xml :

<dependencies>
     <dependency>
          <groupId>com.azure</groupId>
          <artifactId>azure-messaging-servicebus</artifactId>
          <version>7.0.0</version>
     </dependency>
</dependencies>

I created two topics with two subscriptions in each and send messages to it, as below in Azure Service Bus.

Topic1 :

enter image description here

Topic2 :

enter image description here

Output :

It runs successfully and has received each message from the four subscriptions in the two topics as below.

enter image description here