uWebsockets: Limit connections

353 Views Asked by At

We have a uWebsocket node server that's behind a haproxy load balancer. A user can request live data or historical data depending on the request being sent to the server. When a user requests live data, they are subscribed to a channel for that data stream.

We would like to limit 3 connections to live data per user. I have tried to implement this but ran into some problems:

  1. When a user doesn't gracefully disconnect from the websocket server, the count of user's live connections doesn't decrement.

  2. Because the user is SUBSCRIBING to live data I have been unsuccessful in implementing a ping/pong system to check if the user is still connected. Probably a limitation for when a user is a subscribed to a channel, they cannot use the send method or else the user will be unsubscribed to the channel. I'm not entirely sure.

How can I handle disconnections when using subscribe? It seems that the ping/pong method is not an option for me, and I can't create another instance of uws because the server is behind a load balancer that forwards the request to only one websocket server.

Here is the code, it might look strange because I removed a bunch of proprietary code. I tried to make it look as clean as possible.

const wsConns = {};
const connsByUid = {};

const wapp=uWS.App().ws('/*', {
  compression: uWS.DISABLED,
  maxPayloadLength: 1024,
  maxBackpressure: 1024*10,
  closeOnBackpressureLimit:false,
  upgrade: (res, req, context) => {
    /* This immediately calls open handler, you must not use res after this call */
    res.upgrade({
      url: req.getUrl(),
      ip: req.getHeader("x-forwarded-for"),
    },
    /* Spell these correctly */
    req.getHeader('sec-websocket-key'),
    req.getHeader('sec-websocket-protocol'),
    req.getHeader('sec-websocket-extensions'),
    context);
  },
  open:(ws)=>{
    ws.newConnection = true;
  },
  message: async (ws, message, isBinary) => {

    let json = JSON.parse(decoder.write(Buffer.from(message)));
    try {
      switch (json.action) {
        case 'live':
          ws.uid = json.uid;
          if (json.subscribe && ws.newConnection) {
            if (connsByUid && connsByUid[json.uid]) {
              if (connsByUid[json.uid] >= 3) {
                ws.send(JSON.stringify({
                  action: 'Max Connections Reached',
                }));
                return;
              }
              connsByUid[json.uid]++;
            } else {
              connsByUid[json.uid] = 1;
            }
            ws.newConnection = false;
            ws.subscribed = true;

          } else {
            if (ws.subscribed) {
              connsByUid[ws.uid]--;
              ws.subscribed = false;
            }
          }

          if (ws.subscribed) {
            ws.subscribe(json.symbol+json.timeFrame);
          }

          break;
      }
    } catch (e) {
      console.log(e);
    }
  
  },
  close: async (ws, code, message) =>{
    delete wsConns[ws.uid];
    
    if (ws.subscribed) {
      const connsByUid = JSON.parse(await redisAlerts.get('connsByUid')) || {};
      connsByUid[ws.uid]--;
    }
  },
  drain:ws=>{
    ws.getBufferedAmount();
  }
}).listen(PORT, (listenSocket) => {
  if (listenSocket) {
    console.log('Listening to port ', PORT);
  }
});
0

There are 0 best solutions below