I am trying to create a single server/multiple clients NetMQ test program (I believe this should use the Router-Dealer pattern) which would allow the server to process unique client requests using workers on separate threads (and reply to each client separately), but I cannot find a single working example which works with the current nuget version (v4.0.1.5 at the time of writing).
Each client is just supposed to send a request (which contains its Id since it's a Dealer socket), and get its dedicated response from the server. Server should have a pool of threads (IOCP style) which will consume messages and send back responses.
I found the Multithreaded example in the NetMQ/Samples github, which seems to be what I am looking for, but it uses a QueueDevice
to connect clients with workers, and this class seems to have been removed at some point:
var queue = new QueueDevice(
"tcp://localhost:5555",
"tcp://localhost:5556",
DeviceMode.Threaded);
Then there is the Router-dealer example at netmq.readthedocs.io, which uses the NetMQPoller
, but doesn't compile because it calls the non-existent RouterSocket.ReceiveMessage()
and NetMQSocket.Receive(out bool hasmore)
methods, which have been removed:
public static void Main(string[] args)
{
// NOTES
// 1. Use ThreadLocal<DealerSocket> where each thread has
// its own client DealerSocket to talk to server
// 2. Each thread can send using it own socket
// 3. Each thread socket is added to poller
const int delay = 3000; // millis
var clientSocketPerThread = new ThreadLocal<DealerSocket>();
using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
using (var poller = new NetMQPoller())
{
// Start some threads, each with its own DealerSocket
// to talk to the server socket. Creates lots of sockets,
// but no nasty race conditions no shared state, each
// thread has its own socket, happy days.
for (int i = 0; i < 3; i++)
{
Task.Factory.StartNew(state =>
{
DealerSocket client = null;
if (!clientSocketPerThread.IsValueCreated)
{
client = new DealerSocket();
client.Options.Identity =
Encoding.Unicode.GetBytes(state.ToString());
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += Client_ReceiveReady;
clientSocketPerThread.Value = client;
poller.Add(client);
}
else
{
client = clientSocketPerThread.Value;
}
while (true)
{
var messageToServer = new NetMQMessage();
messageToServer.AppendEmptyFrame();
messageToServer.Append(state.ToString());
PrintFrames("Client Sending", messageToServer);
client.SendMultipartMessage(messageToServer);
Thread.Sleep(delay);
}
}, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
}
// start the poller
poller.RunAsync();
// server loop
while (true)
{
var clientMessage = server.ReceiveMessage();
PrintFrames("Server receiving", clientMessage);
if (clientMessage.FrameCount == 3)
{
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server {1}",
clientOriginalMessage, DateTime.Now.ToLongTimeString());
var messageToClient = new NetMQMessage();
messageToClient.Append(clientAddress);
messageToClient.AppendEmptyFrame();
messageToClient.Append(response);
server.SendMultipartMessage(messageToClient);
}
}
}
}
static void PrintFrames(string operationType, NetMQMessage message)
{
for (int i = 0; i < message.FrameCount; i++)
{
Console.WriteLine("{0} Socket : Frame[{1}] = {2}", operationType, i,
message[i].ConvertToString());
}
}
static void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
bool hasmore = false;
e.Socket.Receive(out hasmore);
if (hasmore)
{
string result = e.Socket.ReceiveFrameString(out hasmore);
Console.WriteLine("REPLY {0}", result);
}
}
Perhaps these are easy to fix, but I am a NetMQ newbie and I would just like to get a proper "multithreaded Hello World" for C# working.
The NetMQ API is slightly different from the one at the official 0MQ docs (it's more idiomatic with C# and uses new low-allocation .NET constructs), so it's hard for me to understand how to properly port original C examples.
Can someone point me to a resource with up-to-date examples for this scenario, or help me understand what's the correct way to do it using the new NetMQ API?
Or is this library deprecated and I should use the clrzmq4 .NET wrapper? From what I've seen, NetMQ uses Span<T>
and ref structs all around the codebase, so it seems it should be performant.
I've fixed those several issues with the router-dealer example at netmq.readthedocs.io, and now the code is working. I am posting it here, perhaps it will be useful (maybe @somdoron or somebody can check it out and update the docs too).
The example doesn't fully do what I'd want (to have a pool of workers which fairly get jobs), I believe I have to implement the separate broker thread.