MQTTNet, get message and print from a server every 2 seconds

198 Views Asked by At

I'm using MQTTNet to get the status of a 3D printer every two seconds and print it in the console. The first message prints fine, but the second prints twice, the third three times, and so on. I want each message to print only once.. The method I use to print the status is as follows:

static async Task ObserverAsync(IMqttClient mqttClient, CancellationToken cancellationToken)
{
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {

            //Time span
            await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);

            // Callback function when a message is received
            mqttClient.ApplicationMessageReceivedAsync += e =>
            {
                var msg = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);

                if (/*significant condition*/)
                {
                    Console.WriteLine($"Received message: {msg}");
                }

                return Task.CompletedTask;
            };
        }
    }
    catch(TaskCanceledException)
    {
        Console.WriteLine("Observer Disabled");
    }
}

From what I've been researching, it may be a problem with how I handle the "ApplicationMessageReceivedAsync", since it seems that I am subscribing to the event multiple times. I'm not that familiar with the async and event handlers so I'm not sure. After doing a follow-up with the debugger I realized that internally MQTTNet fills a vector called "_handlersForInvoke" every time I call "ApplicationMessageReceivedAsync", maybe that could be a clue, but I really don't know how I can restrict the filling of the vector I don't think it's a good idea to do it "by force" either.

2

There are 2 best solutions below

1
On BEST ANSWER

Your current implementation assigns a (new) handler inside a loop, which means that through every iteration, a new handler is assigned, linearly increasing the print messages as you had seen.

Seeing your situation and intention, it might be better to choose a different approach, such as polling for messages instead of assigning a handler. E.g.

static async Task ObserverAsync(IMqttClient mqttClient, CancellationToken cancellationToken)
{
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            //Time span
            await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);

            // Await for a message
            var mqttPacket = await mqttClient.Receive(cancellationToken);

            var msg = Encoding.UTF8.GetString(mqttPacket.PayloadSegment);

            if (/*significant condition*/)
            {
                Console.WriteLine($"Received message: {msg}");
            }
        }
    }
    catch(TaskCanceledException)
    {
        Console.WriteLine("Observer Disabled");
    }
}
0
On

Thanks to Ivan's observation I have corrected how I have manipulated the ApplicationMessageReceivedAsync handler, the solution looks like this, in case anyone needs it in the future (me included):

static async Task ObserverAsync(IMqttClient mqttClient, CancellationToken cancellationToken)
{
    CancellationToken shutdownToken = new CancellationToken();

    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            //Time span
            await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
            // Callback function when a message is received
            mqttClient.ApplicationMessageReceivedAsync += e =>
            {
                e.AutoAcknowledge = false;

                var msg = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);

                e.AcknowledgeAsync(shutdownToken);

                if (/*significant condition*/)
                {
                    Console.WriteLine($"Received message: {msg}");
                }

                shutdownToken.ThrowIfCancellationRequested();

                return Task.CompletedTask;
            };
        }
    }
    catch (TaskCanceledException)
    {
        Console.WriteLine("Observer Disabled");
    }
}