NetMQ response socket poll fails after succeeding one time

172 Views Asked by At

I'm new to the world of ZeroMQ and I'm working through the documentation of both NetMQ and ZeroMQ as I go. I'm currently implementing (or preparing to implement) the Paranoid Pirate Pattern, and hit a snag. I have a single app which is running the server(s), clients, and eventually queue, though I haven't implemented the queue yet. Right now, there should only be one server at a time running. I can launch as many clients as I like, all communicating with the single server. I am able to have my server "crash" and restart it (manually for now, automatically soon). That all works. Or at least, restarting the server works once.

To enforce that there's only a single server running, I have a thread (which I'll call the WatchThread) which opens a response socket that binds to an address and polls for messages. When the server dies, it signals its demise and the WatchThread decrements the count when it receives the signal. Here's the code snippet that is failing:

//This is the server's main loop:

        public void Start(object? count)
        {
            num = (int)(count ?? -1);
            _model.WriteMessage($"Server {num} up");
            var rng = new Random();
            using ResponseSocket server = new();

            server.Bind(tcpLocalhost); //This is for talking to the clients
                int cycles = 0;
            while (true)
            {
                var message = server.ReceiveFrameString();
                if (message == "Kill")
                {
                    server.SendFrame("Dying");
                    return;
                }
                if (cycles++ > 3 && rng.Next(0, 16) == 0)
                {
                    _model.WriteMessage($"Server {num} \"Crashing\"");
                    RequestSocket sock = new(); //This is for talking to the WatchThread
                    sock.Connect(WatchThreadString);
                    sock.SendFrame("Dying"); //This isn't working correctly
                    return;
                }
                if(cycles > 3 && rng.Next(0, 10) == 0)
                {
                    _model.WriteMessage($"Server {num}: Slowdown");
                    Thread.Sleep(1000);
                }
                server.SendFrame($"Server{num}: {message}");
            }

        }

And here's the WatchThread code:

    public const string WatchThreadString = "tcp://localhost:5000";

    private void WatchServers()
    {
        _watchThread = new ResponseSocket(WatchThreadString);


        _watchThread.ReceiveReady += OnWatchThreadOnReceiveReady;

        while (_listen)
        {
            bool result = _watchThread.Poll(TimeSpan.FromMilliseconds(1000));
        }
    }

    private void OnWatchThreadOnReceiveReady(object? s, NetMQSocketEventArgs a)
    {
        lock (_countLock)
        {
            ServerCount--;
        }

        _watchThread.ReceiveFrameBytes();
    }

As you can see, it's pretty straight forward. What am I missing? It seems like what should happen is exactly what happens the first time everything is instantiated: The server is supposed to go down, so it opens a new socket to the pre-existing WatchThread and sends a frame. The WatchThread receives the message and decrements the counter appropriately. It's only on the second server where things don't behave as expected...

Edit: I was able to get it to work by unbinding/closing _watchThread and recreating it... it's definitely suboptimal and it still seems like I'm missing something. It's almost as if for some reason I can only use that socket once, though I have other request sockets being used multiple times.

Additional Edit: My netstat output with 6 clients running (kubernetes is in my host file as 127.0.0.1 as is detailed here):

  TCP    127.0.0.1:5555         MyComputerName:0       LISTENING
  TCP    127.0.0.1:5555         kubernetes:64243       ESTABLISHED
  TCP    127.0.0.1:5555         kubernetes:64261       ESTABLISHED
  TCP    127.0.0.1:5555         kubernetes:64264       ESTABLISHED
  TCP    127.0.0.1:5555         kubernetes:64269       ESTABLISHED
  TCP    127.0.0.1:5555         kubernetes:64272       ESTABLISHED
  TCP    127.0.0.1:5555         kubernetes:64273       ESTABLISHED
0

There are 0 best solutions below