Event Hub message distribution

506 Views Asked by At

We are using IoT Hub for ingesting device data. We currently are making use of 10 partitions to process messages. Running 5 workers would therefore have each instance processing 2 partitions.

What we have found is if a device connects and offloads lets say 500 messages to the hub, all these messages will get piped out of one context partition only even if the others are not doing any work.

Is the only option to design a system utilizing routes/endpoints to achieve this?

We are using MQTT on a low level device so changing device firmware short term is not possible but is possible long term.

I will be honest, I thought it fed messages whichever had the least, even a round robin would be better. We will most likely need to feed the messages into a queue and process them from there to achieve better scale. Currently creating multiple threads to process each IEnumerable<EventData> messages for each partition and its MUUUCH better thank goodness. The bottle neck will still however exist in some form until we move to implementing a further queue and scale there rather.

UPDATE - Adding some example code showing what Im doing Just a heads up we now have a 10 fold performance improvement by processing each batch of messages via multiple tasks. I will be refactoring code in our next release but for now this is working great.

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
       await ProcessEventsBulk(context, messages);
    }

    async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
    {
        List<Task> TaskList = new List<Task>();
        foreach (EventData message in messages)
        {
            var LastTask = Task.Run(() => GoBoy(context, message));
            TaskList.Add(LastTask);
        }
        await Task.WhenAll(TaskList);
    }

    async Task GoBoy(PartitionContext context, EventData message)
    {
        using (var db = new AppDbContext(_dbContextConnectionString))
        {
            await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
            await db.SaveChangesAsync();
        }
    }

Process Event will do the follows :

  • Decode Packet
  • Log to Azure table storage via NLOG
  • Update values in SQL (device table in particular)
  • Insert message into DocumentDB
  • Forward message onto 2 queues
  • Respond to the unit

I know we can split these into separate workers but I love it that its "real time" and safer as the unit doesn't clear the message until we acknowledge.

I created a branch with some early code where I do batch inserting into DocDB prior to ACKing. Didnt see a major improvement but should help with the RUs Im assuming.

0

There are 0 best solutions below