NetMQ: Can NetMQPoller take care of async message sending?

1.1k Views Asked by At

Does anyone have an example of using a NetMQPoller in conjunction with sending messages from an asynchronous task? i.e.

// 1. Receive a message from one of many clients
// 2. Handle client requests on multiple async Task's (may be file bound)
// 3. Respond to clients when long running Task finishes

We are currently using a very barebones Dealer/Router setup for multiple clients to talk to a single server. On the server we pass each client request on to a pool of Tasks/Threads that handle the request, generate a response, and then attempt to respond to the original client. Because I'm new to this I didn't realize calling the client on a different Task/Thread would bork things but it makes sense now.

What I'm looking for is the right way to handle this. I could definitely have a BlockingCollection of messages to send back and just service that from the original thread, but I recently discovered the NetMQPoller and it sort of sounds like it has some of this built in. I am not 100% sure if it can help me send messages on the original thread or not but there are a lot of parts to it that sound promising. Can anyone point me at an example of sending messages back to the Dealer/client from the Router/server on a different thread than the original message was received on?

UPDATE I think I may have sort-of solved my own question by combining

    class Program
    {
        static void Main( string[] args )
        {
            // NOTES
            // 1. Use ThreadLocal<DealerSocket> where each thread has
            //    its own client DealerSocket to talk to server
            // 2. Each thread can send using it own socket
            // 3. Each thread socket is added to poller
            const int delay = 3000; // millis
            var clientSocketPerThread = new ThreadLocal<DealerSocket>();

            using( var server = new RouterSocket("@tcp://127.0.0.1:5556") )
            using( var queue = new NetMQQueue<MyResponse>() )
            using( var poller = new NetMQPoller { queue } )
            {
                // Start some threads, each with its own DealerSocket
                // to talk to the server socket. Creates lots of sockets,
                // but no nasty race conditions no shared state, each
                // thread has its own socket, happy days.
                for( int i = 0; i < 3; i++ )
                {
                    Task.Factory.StartNew(state =>
                    {
                        DealerSocket client = null;
                        if( !clientSocketPerThread.IsValueCreated )
                        {
                            client = new DealerSocket();
                            client.Options.Identity =
                                Encoding.Unicode.GetBytes(state.ToString());
                            client.Connect("tcp://127.0.0.1:5556");
                            client.ReceiveReady += Client_ReceiveReady;
                            clientSocketPerThread.Value = client;
                            poller.Add(client);
                        }
                        else
                        {
                            client = clientSocketPerThread.Value;
                        }

                        while( true )
                        {
                            var messageToServer = new NetMQMessage();
                            messageToServer.AppendEmptyFrame();
                            messageToServer.Append(state.ToString());
                            Console.WriteLine("======================================");
                            Console.WriteLine(" OUTGOING MESSAGE TO SERVER ");
                            Console.WriteLine("======================================");
                            PrintFrames("Client Sending", messageToServer);
                            client.SendMultipartMessage(messageToServer);
                            Thread.Sleep(delay);
                        }
                    }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                }

                queue.ReceiveReady += ( sender, e ) =>
                {
                    var queueItem = e.Queue.Dequeue();

                    var messageToClient = new NetMQMessage();
                    messageToClient.Append(queueItem.ClientId);
                    messageToClient.AppendEmptyFrame();
                    messageToClient.Append(queueItem.MessageToClient);
                    server.SendMultipartMessage(messageToClient);
                };

                // start the poller
                poller.RunAsync();

                // server loop
                while( true )
                {
                    var clientMessage = server.ReceiveMultipartMessage();
                    Console.WriteLine("======================================");
                    Console.WriteLine(" INCOMING CLIENT MESSAGE FROM CLIENT ");
                    Console.WriteLine("======================================");
                    PrintFrames("Server receiving", clientMessage);
                    if( clientMessage.FrameCount == 3 )
                    {
                        var clientAddress = clientMessage[0];
                        var clientOriginalMessage = clientMessage[2].ConvertToString();
                        string response = string.Format("{0} back from server {1}",
                            clientOriginalMessage, DateTime.Now.ToLongTimeString());

                        Task.Factory.StartNew(async() =>
                        {
                            await Task.Delay(200);

                            var clientResponse = new MyResponse()
                            {
                                ClientId = clientAddress,
                                MessageToClient = response
                            };
                            queue.Enqueue(clientResponse);
                        }, TaskCreationOptions.LongRunning);
                    }
                }
            }
        }

        static void PrintFrames( string operationType, NetMQMessage message )
        {
            for( int i = 0; i < message.FrameCount; i++ )
            {
                Console.WriteLine("{0} Socket : Frame[{1}] = {2}", operationType, i,
                    message[i].ConvertToString());
            }
        }

        static void Client_ReceiveReady( object sender, NetMQSocketEventArgs e )
        {
            bool hasmore = false;
            e.Socket.ReceiveFrameString(out hasmore);
            if( hasmore )
            {
                string result = e.Socket.ReceiveFrameString(out hasmore);
                Console.WriteLine("REPLY {0}", result);
            }
        }

        class MyResponse
        {
            public NetMQFrame ClientId;

            public string MessageToClient;
        }
    }
0

There are 0 best solutions below