I'm trying to get messages from Azure Service Bus via java application. I created necessary client config and for example there was successful connection through ManagementClient

@Bean
public ClientSettings getMessageReceiver() throws ServiceBusException, InterruptedException {

    AzureTokenCredentials azureTokenCredentials = new ApplicationTokenCredentials(
            "clientID,
            "domain",
            "secret",
            AzureEnvironment.AZURE
    );

    TokenProvider tokenProvider = TokenProvider.createAzureActiveDirectoryTokenProvider(
            new AzureAuthentication(azureTokenCredentials),
            AzureEnvironment.AZURE.activeDirectoryEndpoint(),
            null
    );

    ClientSettings clientSettings = new ClientSettings(tokenProvider,
            RetryPolicy.getDefault(),
            Duration.ofSeconds(30),
            TransportType.AMQP);

    return clientSettings;
}

ManagementClient managementClient =
            new ManagementClient(Util.convertNamespaceToEndPointURI("namespace"),
                    clientSettings);
    managementClient.getTopics();

But when I try to get messages from particular topic:

        SubscriptionClient subscriptionClient = new SubscriptionClient("namespace", "events/subscriptions/subscription", clientSettings, ReceiveMode.PEEKLOCK);

And got an error message:

It is not possible for an entity that requires sessions to create a non-sessionful message receiver.

What additional steps should be provided?

1

There are 1 best solutions below

0
On

You have enabled Session (by default disabled) while creating in your topic subscription. If you do not need message session, recreate the subscription with 'requires session' disabled (NOTE: you cannot change that property once a subscription is created).

Or, if you really need message session, update your code like below to receive session first, and from received session, receive messages. All the code samples can be found here and the session sample specifically here.

        // The connection string value can be obtained by:
        // 1. Going to your Service Bus namespace in Azure Portal.
        // 2. Go to "Shared access policies"
        // 3. Copy the connection string for the "RootManageSharedAccessKey" policy.
        String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};"
            + "SharedAccessKey={key}";

        // Create a receiver.
        // "<<topic-name>>" will be the name of the Service Bus topic you created inside the Service Bus namespace.
        // "<<subscription-name>>" will be the name of the session-enabled subscription.
        ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sessionReceiver()
            .receiveMode(ReceiveMode.PEEK_LOCK)
            .topicName("<<topic-name>>")
            .subscriptionName("<<subscription-name>>")
            .buildAsyncClient();

        Disposable subscription = receiver.receiveMessages()
            .flatMap(context -> {
                if (context.hasError()) {
                    System.out.printf("An error occurred in session %s. Error: %s%n",
                        context.getSessionId(), context.getThrowable());
                    return Mono.empty();
                }

                System.out.println("Processing message from session: " + context.getSessionId());

                // Process message
                return receiver.complete(context.getMessage());
            }).subscribe(aVoid -> {
            }, error -> System.err.println("Error occurred: " + error));

        // Subscribe is not a blocking call so we sleep here so the program does not end.
        TimeUnit.SECONDS.sleep(60);

        // Disposing of the subscription will cancel the receive() operation.
        subscription.dispose();

        // Close the receiver.
        receiver.close();