C# - Azure Service Bus deployed in VM suddenly hangs and stops processing messages

54 Views Asked by At

New to processing messages using Azure Service bus. Any help would be greatly appreciated.

Our Azure Service Bus queues messages using an API and the received messages are processed using 4 different VMs using a "Task Engine windows service". The connection is opened individually for each VM but the tenant Id, client Id and namespace is same for all 4 of them. We are not using partitioned threads. In all lower environments the code runs fine but in PROD (about 7 million messages queued and processed in a day), the VM suddenly stops processing messages and just hangs. The Site Reliability team is not finding any CPU or Memory issues with the VM. Every time this issue happens, the "Task Engine windows service" in VM has to be restarted so that the service bus message processing can resume.

Here is what the metrics looks like in Azure Service Bus. Azure Metrics

Configuration

  • AutoCompleteMessages = false

  • PrefetchCount = 50

  • ReceiveMode = PeekLock

  • MaxConcurrentCalls = 5

  • MaxAutoLockRenewalDuration = 15 min (900 seconds)

  • MessageLockDuration = 5 min

Code:

The following code initializes Azure Connection when Task Engine is started for the first time and runs until the engine is killed/ restarted.

private static void initializeAzureServiceBusClientConnection()
{
    if (_isAzureEnabled)
    {
        var _tenantId = GetFromConfig.AppSettings["tenant-id"];
        var _clientId = GetFromConfig.AppSettings["client-id"];
        var _clientSecret = GetFromConfig.AppSettings["client-secret"];
        var _servicebusNamespace = GetFromConfig.AppSettings["servicebus-namespace"];

        var _token = new ClientSecretCredential(_tenantId, _clientId, _clientSecret);
    
        var clientOptions = new ServiceBusClientOptions()
        {
            TransportType = ServiceBusTransportType.AmqpWebSockets,
        };

        ServiceBusClient = new ServiceBusClient(_servicebusNamespace, _token, clientOptions);
    }
}

The following code processes messages. (I haven't included code that queues messages. There are no issues there)

using Azure.Messaging.ServiceBus;

namespace TaskEngine
{
    internal sealed class AzureManager
    {
        private Thread _threadQueue;

        //Called once when Task Engine starts
        internal void Startup()
        {
            if (TaskEngineManager.IsAzureBusinessEventsEnabled)
            {
                Logger.Log(_isRunning == false, "The AzureManager startup method was called multiple times without a stop.");
                _isRunning = true;
                _messageQueueName = System.Configuration.ConfigurationManager.AppSettings["servicebus-queue"];
                _invalidMessageQueueName = System.Configuration.ConfigurationManager.AppSettings["servicebus-invalid-queue"];

                getAzureServiceBusAccess();

                // Start up background thread to manage queue
                _threadQueue = new Thread(queueLoop);                       //We are running in Thread because there is another process that runs in parallel
                _threadQueue.Name = "Azure Business Event Queue Thread";
                _threadQueue.Start();
            }
            else
            {
                Tracer.RaiseInfo(TraceSourceNames.AzureManager, "Azure Service Bus is disabled.");
            }
        }


        private void queueLoop()
        {
            bool isConnectionReset;

            do
            {
                isConnectionReset = false; //Reset in each loop
                isConnectionReset = AsyncGatekeeper.Run(() => processUsingAzureServiceBusMQAsync());
            } while (_isWorking && isConnectionReset);
        }



        private async System.Threading.Tasks.Task<bool> processUsingAzureServiceBusMQAsync()
        {
            if (_isWorking)
            {
                try
                {
                    // add handler to process messages
                    _serviceBusReceiverProcessor.ProcessMessageAsync += MessageHandlerAsync;

                    // add handler to process any errors
                    _serviceBusReceiverProcessor.ProcessErrorAsync += ErrorHandlerAsync;

                    await _serviceBusReceiverProcessor.StartProcessingAsync();

                }
                catch (Exception ex)
                {
                    Tracer.RaiseError(TraceSourceNames.AzureManager, "Azure Service Bus unhandled exception occurred. Attempting to reset connection.", ex);
                    try
                    {
                        bool isTaskCompleted = await closeAndDisposeConnectionAsync();
                        if (isTaskCompleted)
                        {
                            tryResetConnections(ex);
                            cooldownReceiveRequests();
                        }
                    }
                    catch (Exception)
                    {
                        Tracer.RaiseError(TraceSourceNames.AzureManager, "Azure Service Bus unhandled exception occurred. Reset connection failed.", ex);
                    }
                    return true;    //reset retry loop
                }
            }
            return false;
        }


        async System.Threading.Tasks.Task MessageHandlerAsync(ProcessMessageEventArgs args)
        {
            var message = args.Message;
            try
            {
                // start processing 
                Tracer.RaiseInfo(TraceSourceNames.AzureManager, $"Azure Service Bus Event Queue thread received message {message.MessageId}.");
                var _messageBody = message.Body;

                bool isSuccess = processMessage(message.MessageId, _messageBody.ToStream());

                if (!isSuccess)
                {
                    await WriteToInvalidQueueAsync(_messageBody);
                }

                args.MessageLockLostAsync += OnMessageLockLostAsync;

                // complete the message. message is deleted from the queue. 
                try
                {
                    await args.CompleteMessageAsync(message);
                }
                catch (Exception e)
                {
                    Tracer.RaiseWarning(TraceSourceNames.AzureManager, "Azure service bus message lock already released when completing message. Continuing processing messages.", e);
                }

                Tracer.RaiseInfo(TraceSourceNames.AzureManager, $"Azure Service Bus Event Queue thread completed processing message {message.MessageId}.");
            }
            catch (Exception ex)
            {
                Tracer.RaiseError(TraceSourceNames.AzureManager, "Unexpected error occured moving task from Azure Service Bus to database; attempting to re-queue message.", ex);
                try
                {
                    await args.AbandonMessageAsync(args.Message);
                }
                catch (Exception e)
                {
                    Tracer.RaiseWarning(TraceSourceNames.AzureManager, "Azure service bus message lock already released when abandoning message.", e);
                }
            }
        }

        System.Threading.Tasks.Task ErrorHandlerAsync(ProcessErrorEventArgs args)
        {
            var errMessage = $"Azure Service Bus error handler caught exception. Message processing will retry. Error Source: {args.ErrorSource}; EntityPath: {args.EntityPath}; FullyQualifiedNamespace: {args.FullyQualifiedNamespace}";
            Tracer.RaiseError(TraceSourceNames.AzureManager, errMessage, args.Exception);
            return System.Threading.Tasks.Task.CompletedTask;
        }

        System.Threading.Tasks.Task OnMessageLockLostAsync(MessageLockLostEventArgs args)
        {
            string errMessage = $"Azure Service Bus Message lock Lost handler caught exception. Message: {args.Message}. Message Locked Until: {args.Message.LockedUntil.ToLocalTime()}";

            var messageInfo = convertMessage(args.Message.Body.ToStream());
            if (messageInfo != null)
            {
                errMessage = $"Azure Service Bus Message Lock Lost handler caught exception. Message processing will retry. Message Info: {messageInfo}. Message Locked Until: {args.Message.LockedUntil.ToLocalTime()}";
            }

            Tracer.RaiseError(TraceSourceNames.AzureManager, errMessage, args.Exception);
            return System.Threading.Tasks.Task.CompletedTask;
        }

        //Called when Task Engine is started for the first time and any time when connection is lost and needs to be reset
        private static void getAzureServiceBusAccess()
        {
            Logger.Log(!(string.IsNullOrEmpty(_messageQueueName)
                            || string.IsNullOrEmpty(_invalidMessageQueueName))
                            , "Incomplete Azure Service Bus configuration settings. Unable to proceed.");

            Tracer.RaiseInfo(TraceSourceNames.AzureManager, $"Attempting to connect to Azure Service Bus.");
            _serviceBusSender = TaskEngineManager.ServiceBusClient.CreateSender(_invalidMessageQueueName, new ServiceBusSenderOptions());

            var processorOptions = new ServiceBusProcessorOptions()
            {
                MaxConcurrentCalls = 5,
                PrefetchCount = 50,
                MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(900),     //Default 15 min. This lock renewal duration should be greater than LockDuration which is set to 5 min.
                AutoCompleteMessages = false,
            };
            _serviceBusReceiverProcessor = TaskEngineManager.ServiceBusClient.CreateProcessor(_messageQueueName, processorOptions);
        }

        private void tryResetConnections(Exception exception)
        {
            Tracer.RaiseInfo(TraceSourceNames.AzureManager, "Resetting Azure Service Bus connection and retrying.");

            try
            {
                if (DateTime.Now.Subtract(LastQueueReset).TotalSeconds > 1800)
                {
                    LastQueueReset = DateTime.Now;
                    getAzureServiceBusAccess();
                    Tracer.RaiseInfo(TraceSourceNames.AzureManager, "Azure Service Bus Connection succesfully reset.");
                }
                else
                {
                    NotificationRegulator.Notify(NotificationReason.AzureServiceBusFailure, exception, Resources.AzureServiceBus_ConnectError);
                }
            }
            catch (Exception ex)
            {
                Tracer.RaiseError(TraceSourceNames.AzureManager, "Resetting Azure Service Bus connection failed.", ex);
                throw;
            }
        }

        private static async System.Threading.Tasks.Task<bool> closeAndDisposeConnectionAsync()
        {
            try
            {
                await _serviceBusReceiverProcessor.StopProcessingAsync();
                await _serviceBusReceiverProcessor.DisposeAsync();
            }
            catch (Exception ex)
            {
                //Do not throw and eat exception - Receiver may have been already disposed
                Tracer.RaiseWarning(TraceSourceNames.AzureManager, "Error disposing Azure Service Bus Receiver Connection.", ex);
            }

            try
            {
                await _serviceBusSender.DisposeAsync();
            }
            catch (Exception ex)
            {
                //Do not throw and eat exception - Receiver may have been already disposed
                Tracer.RaiseWarning(TraceSourceNames.AzureManager, "Error disposing Azure Service Bus Sender Connection.", ex);
            }

            return true;
        }
    }
}

There are lot of logging added to investigate the issue but the only error we are seeing is the LockLost exception occurring here and there. The connection is not lost, the connection does not get disposed or closed automatically unless manually restarted.

Please help!

We played around the configuration settings to see if that helps. There is no change. Every day we are restarting the windows service that processes messages at least thrice a day.

0

There are 0 best solutions below