We have a uWebsocket node server that's behind a haproxy load balancer. A user can request live data or historical data depending on the request being sent to the server. When a user requests live data, they are subscribed to a channel for that data stream.
We would like to limit 3 connections to live data per user. I have tried to implement this but ran into some problems:
When a user doesn't gracefully disconnect from the websocket server, the count of user's live connections doesn't decrement.
Because the user is SUBSCRIBING to live data I have been unsuccessful in implementing a ping/pong system to check if the user is still connected. Probably a limitation for when a user is a subscribed to a channel, they cannot use the send method or else the user will be unsubscribed to the channel. I'm not entirely sure.
How can I handle disconnections when using subscribe? It seems that the ping/pong method is not an option for me, and I can't create another instance of uws because the server is behind a load balancer that forwards the request to only one websocket server.
Here is the code, it might look strange because I removed a bunch of proprietary code. I tried to make it look as clean as possible.
const wsConns = {};
const connsByUid = {};
const wapp=uWS.App().ws('/*', {
compression: uWS.DISABLED,
maxPayloadLength: 1024,
maxBackpressure: 1024*10,
closeOnBackpressureLimit:false,
upgrade: (res, req, context) => {
/* This immediately calls open handler, you must not use res after this call */
res.upgrade({
url: req.getUrl(),
ip: req.getHeader("x-forwarded-for"),
},
/* Spell these correctly */
req.getHeader('sec-websocket-key'),
req.getHeader('sec-websocket-protocol'),
req.getHeader('sec-websocket-extensions'),
context);
},
open:(ws)=>{
ws.newConnection = true;
},
message: async (ws, message, isBinary) => {
let json = JSON.parse(decoder.write(Buffer.from(message)));
try {
switch (json.action) {
case 'live':
ws.uid = json.uid;
if (json.subscribe && ws.newConnection) {
if (connsByUid && connsByUid[json.uid]) {
if (connsByUid[json.uid] >= 3) {
ws.send(JSON.stringify({
action: 'Max Connections Reached',
}));
return;
}
connsByUid[json.uid]++;
} else {
connsByUid[json.uid] = 1;
}
ws.newConnection = false;
ws.subscribed = true;
} else {
if (ws.subscribed) {
connsByUid[ws.uid]--;
ws.subscribed = false;
}
}
if (ws.subscribed) {
ws.subscribe(json.symbol+json.timeFrame);
}
break;
}
} catch (e) {
console.log(e);
}
},
close: async (ws, code, message) =>{
delete wsConns[ws.uid];
if (ws.subscribed) {
const connsByUid = JSON.parse(await redisAlerts.get('connsByUid')) || {};
connsByUid[ws.uid]--;
}
},
drain:ws=>{
ws.getBufferedAmount();
}
}).listen(PORT, (listenSocket) => {
if (listenSocket) {
console.log('Listening to port ', PORT);
}
});