Unable to read message from Service bus. it is returning null

1k Views Asked by At

I am trying to read the stream that is on Azure Service bus. But I am getting null bytes. File size that is getting created is same as the size of the bytes that are being sent, but it contains all nulls. I have added the code that is used for converting Stream to Byte array with the function name ReadAllBytes(Stream source)

below is the code for reference:

         static void Main(string[] args)
         {
            MemoryStream largeMessageStream = new MemoryStream();
            #region ReceiveMessage


            var msg = Microsoft.ServiceBus.NamespaceManager.CreateFromConnectionString(ConfigurationSettings.AppSettings["Microsoft.ServiceBus.ConnectionString"].ToString());
            var numofmessages = msg.GetQueue(AccountDetails.QueueName).MessageCountDetails.ActiveMessageCount.ToString();
            if (msg.GetQueue(AccountDetails.QueueName).RequiresSession)
            {
                var queueClient1 = QueueClient.CreateFromConnectionString(ConfigurationSettings.AppSettings["Microsoft.ServiceBus.ConnectionString"].ToString(), AccountDetails.QueueName);
                var session = queueClient1.AcceptMessageSession();

                Console.WriteLine("Message session Id: " + session.SessionId);
                Console.Write("Receiving sub messages");

                while (true)
                {
                    // Receive a sub message
                    BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));

                    if (subMessage != null)
                    {
                        // Copy the sub message body to the large message stream.
                        Stream subMessageStream = subMessage.GetBody<Stream>();
                        subMessageStream.CopyTo(largeMessageStream);
                        // Mark the message as complete.
                        subMessage.Complete();
                        Console.Write(".");
                    }
                    else
                    {
                        // The last message in the sequence is our completeness criteria.
                        Console.WriteLine("Done!");
                        break;
                    }
                }

                // Create an aggregated message from the large message stream.
                BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);

                Console.WriteLine("Received message");
                Console.WriteLine("Message body size: " + largeMessageStream.Length);

                string testFile = @"D:\Dev\csvData1.csv";
                Console.WriteLine("Saving file: " + testFile);

                // Save the message body as a file.
                Stream resultStream = largeMessage.GetBody<Stream>();
                byte[] x = ReadAllBytes(resultStream);

                File.WriteAllBytes(testFile, x);
            }


    public static byte[] ReadAllBytes(Stream source)
    {
        long originalPosition = source.Position;
        source.Position = 0;

        try
        {
            byte[] readBuffer = new byte[source.Length];
            int totalBytesRead = 0;
            int bytesRead;
            while ((bytesRead = source.Read(readBuffer, totalBytesRead, readBuffer.Length - totalBytesRead)) > 0)
            {
                totalBytesRead += bytesRead;
                if (totalBytesRead == readBuffer.Length)
                {
                    int nextByte = source.ReadByte();
                    if (nextByte != -1)
                    {
                        byte[] temp = new byte[readBuffer.Length * 2];
                        Buffer.BlockCopy(readBuffer, 0, temp, 0, readBuffer.Length);
                        Buffer.SetByte(temp, totalBytesRead, (byte)nextByte);
                        readBuffer = temp;
                        totalBytesRead++;
                    }
                }
            }

            byte[] buffer = readBuffer;
            if (readBuffer.Length != totalBytesRead)
            {
                buffer = new byte[totalBytesRead];
                Buffer.BlockCopy(readBuffer, 0, buffer, 0, totalBytesRead);
            }
            return buffer;
        }
        finally
        {
            source.Position = originalPosition;
        }
    }
1

There are 1 best solutions below

1
On

I send a message with stream body using the following code, and then I receive the message and test your code on my side, the code works for me.

using (MemoryStream stream = new MemoryStream(File.ReadAllBytes(@"C:\Users\xxx\Desktop\source.txt")))
{
    client.Send(new BrokeredMessage(stream));
}

My source.txt

"ID", "Age", "Rich", "timestamp"
1, "50", "Y", "2017-06-06 14:19:21.77"
2, "22", "N", "2017-06-06 14:19:21.77"

Byte array

enter image description here

Console app output

enter image description here

csvData1.csv

enter image description here

If possible, you can try to send a new message with stream body and execute your code to check if it can works fine.