Up-to-date example of NetMQ multithreading

3.6k Views Asked by At

I am trying to create a single server/multiple clients NetMQ test program (I believe this should use the Router-Dealer pattern) which would allow the server to process unique client requests using workers on separate threads (and reply to each client separately), but I cannot find a single working example which works with the current nuget version (v4.0.1.5 at the time of writing).

Each client is just supposed to send a request (which contains its Id since it's a Dealer socket), and get its dedicated response from the server. Server should have a pool of threads (IOCP style) which will consume messages and send back responses.

I found the Multithreaded example in the NetMQ/Samples github, which seems to be what I am looking for, but it uses a QueueDevice to connect clients with workers, and this class seems to have been removed at some point:

var queue = new QueueDevice(
     "tcp://localhost:5555", 
     "tcp://localhost:5556", 
     DeviceMode.Threaded);

Then there is the Router-dealer example at netmq.readthedocs.io, which uses the NetMQPoller, but doesn't compile because it calls the non-existent RouterSocket.ReceiveMessage() and NetMQSocket.Receive(out bool hasmore) methods, which have been removed:

public static void Main(string[] args)
{
    // NOTES
    // 1. Use ThreadLocal<DealerSocket> where each thread has
    //    its own client DealerSocket to talk to server
    // 2. Each thread can send using it own socket
    // 3. Each thread socket is added to poller

    const int delay = 3000; // millis
    var clientSocketPerThread = new ThreadLocal<DealerSocket>();
    using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
    using (var poller = new NetMQPoller())
    {
        // Start some threads, each with its own DealerSocket
        // to talk to the server socket. Creates lots of sockets,
        // but no nasty race conditions no shared state, each
        // thread has its own socket, happy days.

        for (int i = 0; i < 3; i++)
        {
            Task.Factory.StartNew(state =>
            {
                DealerSocket client = null;
                if (!clientSocketPerThread.IsValueCreated)
                {
                    client = new DealerSocket();
                    client.Options.Identity =
                        Encoding.Unicode.GetBytes(state.ToString());
                    client.Connect("tcp://127.0.0.1:5556");
                    client.ReceiveReady += Client_ReceiveReady;
                    clientSocketPerThread.Value = client;
                    poller.Add(client);
                }
                else
                {
                    client = clientSocketPerThread.Value;
                }
                while (true)
                {
                    var messageToServer = new NetMQMessage();
                    messageToServer.AppendEmptyFrame();
                    messageToServer.Append(state.ToString());
                    PrintFrames("Client Sending", messageToServer);
                    client.SendMultipartMessage(messageToServer);
                    Thread.Sleep(delay);
                }
            }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
        }

        // start the poller
        poller.RunAsync();

        // server loop
        while (true)
        {
            var clientMessage = server.ReceiveMessage();
            PrintFrames("Server receiving", clientMessage);
            if (clientMessage.FrameCount == 3)
            {
                var clientAddress = clientMessage[0];
                var clientOriginalMessage = clientMessage[2].ConvertToString();
                string response = string.Format("{0} back from server {1}",
                    clientOriginalMessage, DateTime.Now.ToLongTimeString());
                var messageToClient = new NetMQMessage();
                messageToClient.Append(clientAddress);
                messageToClient.AppendEmptyFrame();
                messageToClient.Append(response);
                server.SendMultipartMessage(messageToClient);
            }
        }
    }
}

static void PrintFrames(string operationType, NetMQMessage message)
{
    for (int i = 0; i < message.FrameCount; i++)
    {
        Console.WriteLine("{0} Socket : Frame[{1}] = {2}", operationType, i,
            message[i].ConvertToString());
    }
}

static void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
    bool hasmore = false;
    e.Socket.Receive(out hasmore);
    if (hasmore)
    {
        string result = e.Socket.ReceiveFrameString(out hasmore);
        Console.WriteLine("REPLY {0}", result);
    }
}

Perhaps these are easy to fix, but I am a NetMQ newbie and I would just like to get a proper "multithreaded Hello World" for C# working.

The NetMQ API is slightly different from the one at the official 0MQ docs (it's more idiomatic with C# and uses new low-allocation .NET constructs), so it's hard for me to understand how to properly port original C examples.

Can someone point me to a resource with up-to-date examples for this scenario, or help me understand what's the correct way to do it using the new NetMQ API?

Or is this library deprecated and I should use the clrzmq4 .NET wrapper? From what I've seen, NetMQ uses Span<T> and ref structs all around the codebase, so it seems it should be performant.

1

There are 1 best solutions below

1
On

I've fixed those several issues with the router-dealer example at netmq.readthedocs.io, and now the code is working. I am posting it here, perhaps it will be useful (maybe @somdoron or somebody can check it out and update the docs too).

The example doesn't fully do what I'd want (to have a pool of workers which fairly get jobs), I believe I have to implement the separate broker thread.

using NetMQ;
using NetMQ.Sockets;

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

internal static class Program
{
    public static void Main(string[] args)
    {
        InitLogger();

        const int NumClients = 5;
        Log($"App started with {NumClients} clients\r\n");

        using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
        using (var poller = new NetMQPoller())
        {
            // Start some threads, each with its own DealerSocket
            // to talk to the server socket. Creates lots of sockets,
            // but no nasty race conditions no shared state, each
            // thread has its own socket, happy days.

            for (int i = 0; i < NumClients; i++)
            {
                var clientNo = i + 1;
                Task.Factory.StartNew(async () =>
                {
                    var rnd = new Random(31 + clientNo * 57);
                    var clientId = $"C{clientNo}";
                    DealerSocket client = new DealerSocket();

                    client.Options.Identity = Encoding.ASCII.GetBytes(clientId);
                    client.Connect("tcp://127.0.0.1:5556");
                    client.ReceiveReady += (sender, e) => 
                    {
                        var msg = e.Socket.ReceiveMultipartMessage(3);
                        var clientid = Encoding.ASCII.GetString(e.Socket.Options.Identity);
                        Log($"Client '{clientid}' received <- server", msg);
                    };
                    poller.Add(client);

                    while (true)
                    {
                        var messageToServer = new NetMQMessage();
                        messageToServer.Append(NetMQFrame.Empty);
                        messageToServer.Append($"Some data ({clientId}|{rnd.Next():X8})", Encoding.ASCII);

                        Log($"Client {clientId} sending -> server: ", messageToServer);
                        client.SendMultipartMessage(messageToServer);

                        await Task.Delay(3000);
                    }
                }, TaskCreationOptions.LongRunning);
            }

            // start the poller
            poller.RunAsync();

            // server loop

            while (true)
            {
                var clientMessage = server.ReceiveMultipartMessage();

                if (clientMessage.FrameCount < 1)
                {
                    Log("Server received invalid frame count");
                    continue;
                }

                var clientid = clientMessage[0];
                Log($"Server received <- {clientid.ConvertToString()}: ", clientMessage);
                
                var msg = clientMessage.Last().ConvertToString(Encoding.ASCII);
                string response = $"Replying to '{msg}'";

                {
                    var messageToClient = new NetMQMessage();
                    messageToClient.Append(clientid);
                    messageToClient.Append(NetMQFrame.Empty);
                    messageToClient.Append(response, Encoding.ASCII);
                    Log($"Server sending -> {clientid.ConvertToString()}: {response}");
                    server.SendMultipartMessage(messageToClient);
                }

            }
        }
    }

    #region Poor man's console logging 

    static BlockingCollection<string> _logQueue;

    // log using a blocking collection
    private static void InitLogger()
    {
        _logQueue = new BlockingCollection<string>();
        Task.Factory.StartNew(() =>
        {
            foreach (var msg in _logQueue.GetConsumingEnumerable())
            {
                Console.WriteLine(msg);
            }
        });
    }
    
    // thread id, timestamp, message
    static void Log(string msg)
    {
        var thid = Thread.CurrentThread.ManagedThreadId;
        var time = GetTimestamp().ToString("HH:mm:ss.fff");
        _logQueue.Add($"[T{thid:00}] {time}: {msg}");
    }

    // log all frames in a message
    static void Log(string operationType, NetMQMessage message)
    {
        var frames = string.Join(", ", message.Select((m, i) => $"F[{i}]={{{m.ConvertToString(Encoding.ASCII)}}}"));
        Log($"{operationType} {message.FrameCount} frames: " + frames);
    }

    // if possible, use win10 high precision timestamps
    static DateTime GetTimestamp()
    {
        if (Environment.OSVersion.Platform == PlatformID.Win32NT && Environment.OSVersion.Version.Major >= 10) // win 10
        {
            long fileTime;
            WinApi.GetSystemTimePreciseAsFileTime(out fileTime);
            return DateTime.FromFileTimeUtc(fileTime);
        }

        return DateTime.UtcNow;
    }

    static class WinApi
    {
        [DllImport("Kernel32.dll", CallingConvention = CallingConvention.Winapi)]
        internal static extern void GetSystemTimePreciseAsFileTime(out long filetime);
    }

    #endregion
}