The token is expired while publish the message to servicebus using jms

141 Views Asked by At

The technologies we are currently using are:

  1. Springboot(3.1.2)
  2. Azure ServiceBus(Topic)
  3. Spring JMS

We have followed the jms in Spring to access Azure Service Bus Documentation. If you need to know further.

I provide a high-level overview, We used JMS to handle message publishing(JMSTemplate) and listening(JMSListner) in the Azure ServiceBus Topic. JMS manages authorization and token generation, and we established the connection using clientId/secret.

Here we have to publish nearly millions of messages on the topic. Ultimately, The publishing process is too long. So in between we fetch the below-mentioned error while the publishing message process is ongoing. As a result, we may miss some messages in the topic.

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:184) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:510) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:587) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:664) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at com.cofinity.gr.orchestration.azure.servicebus.AzureServiceBusProvider.publishMessage(AzureServiceBusProvider.java:34) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.azure.servicebus.AzureServiceBusProvider.publishMessage(AzureServiceBusProvider.java:27) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.service.RawDataService.waitForCurationMessage(RawDataService.java:932) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.service.RawDataService.processClusterEntitiesWithValidationFailed(RawDataService.java:272) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.service.RawDataService.processToGenerateBpnV1(RawDataService.java:214) ~[classes!/:0.0.1-SNAPSHOT]
    at jdk.internal.reflect.GeneratedMethodAccessor188.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Unknown Source) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-6.0.11.jar!/:6.0.11]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:704) ~[spring-aop-6.0.11.jar!/:6.0.11]
    at com.cofinity.gr.orchestration.service.RawDataService$$SpringCGLIB$$0.processToGenerateBpnV1(<generated>) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.tamr.bpn.BPNProcessor.process(BPNProcessor.java:32) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.tamr.bpn.BPNProcessor.process(BPNProcessor.java:20) ~[classes!/:0.0.1-SNAPSHOT]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:146) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:322) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:220) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:389) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:313) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.0.11.jar!/:6.0.11]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:256) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:362) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:206) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:139) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:241) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:227) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:153) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:417) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:132) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:316) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:157) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run(SimpleAsyncTaskExecutor.java:285) ~[spring-core-6.0.11.jar!/:6.0.11]
    at java.base/java.lang.Thread.run(Unknown Source) ~[na:na]
Caused by: jakarta.jms.JMSException: ExpiredToken: The token is expired. TrackingId:17702e61-fdc7-4d78-9cb8-fd482356a53e_G13, SystemTracker:NoSystemTracker, Timestamp:2024-01-29T11:26:36 TrackingId:f32eed2ed4f849219cd6513f2ae5e58f_G13, SystemTracker:gateway7, Timestamp:2024-01-29T11:26:36 [condition = com.microsoft:auth-failed]
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsConnection.send(JmsConnection.java:778) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsNoTxTransactionContext.send(JmsNoTxTransactionContext.java:37) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsSession.send(JmsSession.java:976) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsSession.send(JmsSession.java:855) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsMessageProducer.sendMessage(JmsMessageProducer.java:252) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsMessageProducer.send(JmsMessageProducer.java:200) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.messaginghub.pooled.jms.JmsPoolMessageProducer.sendMessage(JmsPoolMessageProducer.java:194) ~[pooled-jms-3.1.0.jar!/:na]
    at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:88) ~[pooled-jms-3.1.0.jar!/:na]
    at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:77) ~[pooled-jms-3.1.0.jar!/:na]
    at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:637) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:611) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.lambda$send$3(JmsTemplate.java:589) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-6.0.11.jar!/:6.0.11]
    ... 36 common frames omitted
Caused by: org.apache.qpid.jms.provider.ProviderException: ExpiredToken: The token is expired. TrackingId:17702e61-fdc7-4d78-9cb8-fd482356a53e_G13, SystemTracker:NoSystemTracker, Timestamp:2024-01-29T11:26:36 TrackingId:f32eed2ed4f849219cd6513f2ae5e58f_G13, SystemTracker:gateway7, Timestamp:2024-01-29T11:26:36 [condition = com.microsoft:auth-failed]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:305) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:191) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:132) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:992) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:878) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:548) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:541) ~[qpid-jms-client-2.0.0.jar!/:na]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.94.Final.jar!/:4.1.94.Final]
    ... 1 common frames omitted

After a few minutes, JMS will again generate the token for the provider, and message publishing will resume.

1

There are 1 best solutions below

0
Suresh Chikkam On

So in between we fetch the below-mentioned error while the publishing message process is ongoing. As a result, we may miss some messages in the topic.

You need to implement token refreshing logic. The token should be refreshed before it expires to maintain continuous communication with Azure Service Bus.

Token refreshing implementation:

import com.microsoft.azure.servicebus.primitives.ServiceBusException;

public class MessageSender {

    private TokenManager tokenManager;
    private ServiceBusClient serviceBusClient;

    public MessageSender(TokenManager tokenManager, ServiceBusClient serviceBusClient) {
        this.tokenManager = tokenManager;
        this.serviceBusClient = serviceBusClient;
    }

    public void sendMessage(String message) {
        try {
            // Send message using the current token
            serviceBusClient.sendMessage(message);
        } catch (ServiceBusException ex) {
            if (ex instanceof ExpiredTokenException) {
                // Token expired, refresh the token
                String newToken = tokenManager.refreshToken();
                // Update the authentication credentials
                serviceBusClient.updateCredentials(newToken);
                // Retry sending the message
                serviceBusClient.sendMessage(message);
            } else {
                // Handle other exceptions
                ex.printStackTrace();
            }
        }
    }
}

public class TokenManager {

    public String refreshToken() {
        // Logic to refresh the token
        // Make a request to Azure Active Directory or other authentication provider
        // Return the new token
    }
}

public class ServiceBusClient {

    private String token;

    public ServiceBusClient(String token) {
        this.token = token;
    }

    public void sendMessage(String message) throws ServiceBusException {
        // Send message using the token
    }

    public void updateCredentials(String newToken) {
        // Update the token
        this.token = newToken;
    }
}
  • TokenManager is responsible for refreshing the token, ServiceBusClient encapsulates the communication with Azure Service Bus, and MessageSender sends messages using the ServiceBusClient

When an ExpiredTokenException is encountered, the token is refreshed, and the operation is retried with the new token.

enter image description here

The ServiceBusClient class encapsulates the logic for sending messages to Azure Service Bus. It maintains the current authentication token and updates it when necessary.

Reference: