I am using nodejs, socketio, and redis on my backend server and angular v15 with ngx-socketio2 for the client. I can see that the middleware io.use, channel subscription, the socket subscribe event, and redis subscription are all working. Yet for some reason the channel emit wont work. I have tried both socket.emit and io.emit and neither works. The socket.emit fails silently, never firing the response callback. The io.emit responds immediately with:
Error: operation has timed out at Timeout._onTimeout (<project-path>/node_modules/socket.io/dist/broadcast-operator.js:182:17)
I can also see from console and network that the browser never received the channel emit, though client seems to communicate with the server: Socket created, Client emit received,Client subscribe received and the server is properly subscribed to Redis.
The code is pretty straightforward:
const express = require("express");
const app = express();
const url = require("node:url");
const server = require("http").Server(app);
// So that it can be accessed from any origin
const io = require("socket.io")(server, {
cors: {
origin: "*",
},
});
const redis = require("redis");
const rtRedisUrl = process.env.REALTIME_REDIS || "redis://localhost:6379/0";
const main = async () => {
// Creating a redis client
const redisClientPub = redis.createClient({
url: rtRedisUrl,
});
await redisClientPub.connect();
// creating a redis subscriber
const redisClientSub = redisClientPub.duplicate();
await redisClientSub.connect();
const redisGetSet = redisClientPub.duplicate();
await redisGetSet.connect();
app.get("/", (req, res) => {
res.send("success");
});
// setup middleware
io.use(async (socket, next) =>{
var sessionId = null;
var userId = null;
// var url = require('url');
var requestUrl = url.parse(socket.request.url);
var requestQuery = requestUrl.query;
var requestParams = requestQuery.split('&');
var params = {};
for (let i=0; i<=requestParams.length; i++){
let param = requestParams[i];
if (param){
let p=param.split('=');
if (p.length != 2) { continue };
params[p[0]] = p[1];
}
}
sessionId = params["_rtToken"];
userId = params["_rtUserId"];
// retrieve session from redis using the unique key stored in cookies
console.log('authorize: redis.getSet');
console.log(userId);
console.log(sessionId);
// redis.getSet.hGetAll("rtSession-" + userId)
const session= await redisGetSet.HGET(("rtSession-" + userId), sessionId);
console.log(session);
if (!session) {
console.log('redisGetSet.hget err');
next(new Error('Unauthorized Realtime user (session)'));
} else {
console.log('redisGetSet.hget success');
socket.request.session = JSON.parse(session);
next();
};
});
// Creating a websocket connection.
// channel should be realtime_msg ..as requested from client
io.on("connection", (socket) => {
socket.on("subscribe", async (channel) => { // works
console.log('socket subscription: '+channel);
socket.join(channel);
await redisClientSub.subscribe(channel, (message) => { //works
var msg = JSON.parse(message);
console.log('in redis subscribe channel: '+channel);
io.emit(channel, msg, (response) => { //Failing
console.log('socket emit response');
console.log(response); // "got it"
} );
console.log('socket after emit:');
});
});
socket.on("unsubscribe", async (channel) => {
await redisClientSub.unsubscribe(channel);
});
socket.on('realtime_user_id_connected', (message) => { // works
console.log('Realtime User ID connected: ' + message.userId);
});
});
the package.json:
{
"name": "headcount-realtime",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node server.js"
},
"engines": {
"npm": "9.4.0",
"node": "v19.6.0"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"express": "4.18.2",
"redis": "4.6.6",
"socket.io": "4.6.1"
}
}
When a client connects it sends along a user_id and token to which is verified in the redisGetSet call. The client then emits 'realtime_user_id_connected' and subscribes to the 'realtime_msg' channel. The output on the server is:
> [email protected] start
> node server.js
Server is running
authorize: redis.getSet
4
b7925e14f30b2090750bfe571b0863fd
{"user_id":4}
redisGetSet.hget success
authorize: redis.getSet
4
b7925e14f30b2090750bfe571b0863fd
{"user_id":4}
redisGetSet.hget success
Realtime User ID connected: 4
socket subscription: realtime_msg
{
msg: {
resource: 'alerts',
action: 'update',
id: 13,
obj: {
msg: 'testing testing 1 2 3',
id: 13
}
},
recipient_user_ids: [ -1, 42 ]
}
in redis subscribe channel: realtime_msg
socket after emit:
socket emit response
Error: operation has timed out
at Timeout._onTimeout (<project-folder>/node_modules/socket.io/dist/broadcast-operator.js:182:17)
output with with DEBUG enabled:
express:application set "x-powered-by" to true +0ms
express:application set "etag" to 'weak' +2ms
express:application set "etag fn" to [Function: generateETag] +0ms
express:application set "env" to 'development' +1ms
express:application set "query parser" to 'extended' +0ms
express:application set "query parser fn" to [Function: parseExtendedQueryString] +2ms
express:application set "subdomain offset" to 2 +0ms
express:application set "trust proxy" to false +0ms
express:application set "trust proxy fn" to [Function: trustNone] +1ms
express:application booting in development mode +0ms
express:application set "view" to [Function: View] +0ms
express:application set "views" to '/Users/dreadstar/workspace/headcount-realtime/views' +1ms
express:application set "jsonp callback name" to 'callback' +0ms
socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"cors":{"origin":"*"},"cleanupEmptyChildNamespaces":false,"path":"/socket.io"} +1ms
socket.io:server attaching client serving req handler +4ms
express:router use '/' query +583ms
express:router:layer new '/' +0ms
express:router use '/' expressInit +1ms
express:router:layer new '/' +0ms
express:router:route new '/' +1ms
express:router:layer new '/' +0ms
express:router:route get '/' +1ms
express:router:layer new '/' +0ms
Server is running
engine intercepting request for path "/socket.io/" +0ms
engine handling "GET" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8hxeE" +1ms
engine applying middleware n°1 +1ms
engine handshaking client "dwnoIS9YABCzpow1AAAA" +3ms
engine:transport readyState updated from undefined to open (polling) +0ms
engine:socket readyState updated from undefined to opening +0ms
engine:socket readyState updated from opening to open +1ms
engine:socket sending packet "open" ({"sid":"dwnoIS9YABCzpow1AAAA","upgrades":["websocket"],"pingInterval":25000,"pingTimeout":20000,"maxPayload":1000000}) +0ms
engine:polling setting request +0ms
engine:socket flushing buffer to transport +0ms
engine:polling writing "0{"sid":"dwnoIS9YABCzpow1AAAA","upgrades":["websocket"],"pingInterval":25000,"pingTimeout":20000,"maxPayload":1000000}" +1ms
engine:socket executing batch send callback +11ms
socket.io:server incoming connection with id dwnoIS9YABCzpow1AAAA +774ms
engine intercepting request for path "/socket.io/" +25ms
engine handling "POST" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8hxeo&sid=dwnoIS9YABCzpow1AAAA" +0ms
engine applying middleware n°1 +1ms
engine setting new request for existing client +1ms
engine:polling received "40" +26ms
engine:socket received packet message +17ms
socket.io-parser decoded 0 as {"type":0,"nsp":"/"} +0ms
socket.io:client connecting to namespace / +0ms
socket.io:namespace adding socket to nsp / +0ms
authorize: redis.getSet
4
b7925e14f30b2090750bfe571b0863fd
engine applying middleware n°1 +18ms
engine writing headers: {"Access-Control-Allow-Origin":"*"} +3ms
engine upgrading existing transport +3ms
engine:transport readyState updated from undefined to open (websocket) +51ms
engine:socket might upgrade socket transport from "polling" to "websocket" +22ms
engine intercepting request for path "/socket.io/" +1ms
engine handling "GET" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8hxew&sid=dwnoIS9YABCzpow1AAAA" +0ms
engine applying middleware n°1 +0ms
engine setting new request for existing client +0ms
engine:polling setting request +24ms
engine:ws received "2probe" +0ms
engine:socket got probe ping packet, sending pong +2ms
engine:ws writing "3probe" +0ms
{"user_id":4}
redisGetSet.hget success
socket.io:socket socket connected - writing packet +0ms
socket.io:socket join room QYUIpAQxlw_bi-yeAAAB +0ms
socket.io-parser encoding packet {"type":0,"data":{"sid":"QYUIpAQxlw_bi-yeAAAB"},"nsp":"/"} +22ms
socket.io-parser encoded {"type":0,"data":{"sid":"QYUIpAQxlw_bi-yeAAAB"},"nsp":"/"} as 0{"sid":"QYUIpAQxlw_bi-yeAAAB"} +0ms
engine:socket sending packet "message" (0{"sid":"QYUIpAQxlw_bi-yeAAAB"}) +4ms
engine:socket flushing buffer to transport +0ms
engine:polling writing "40{"sid":"QYUIpAQxlw_bi-yeAAAB"}" +5ms
engine:ws received "5" +7ms
engine:socket got upgrade packet - upgrading +3ms
engine:transport readyState updated from open to closing (polling) +10ms
engine:polling closing +4ms
engine:polling transport discarded - closing right away +0ms
engine:transport readyState updated from closing to closed (polling) +0ms
engine:socket writing ping packet - expecting pong within 20000ms +25s
engine:socket sending packet "ping" (undefined) +0ms
engine:socket flushing buffer to transport +0ms
engine:ws writing "2" +25s
engine:socket executing batch send callback +1ms
engine:ws received "3" +1ms
engine:socket received packet pong +1ms
engine:socket got pong +0ms
engine:transport readyState updated from open to closed (websocket) +34s
engine:socket readyState updated from open to closed +9s
socket.io:client client close with reason transport close +34s
socket.io:socket closing socket - reason transport close +34s
engine intercepting request for path "/socket.io/" +2m
engine handling "GET" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8iRx5" +1ms
engine applying middleware n°1 +2ms
engine handshaking client "6jpqT3pb-QM5YVaSAAAC" +3ms
engine:transport readyState updated from undefined to open (polling) +2m
engine:socket readyState updated from undefined to opening +2m
engine:socket readyState updated from opening to open +0ms
engine:socket sending packet "open" ({"sid":"6jpqT3pb-QM5YVaSAAAC","upgrades":["websocket"],"pingInterval":25000,"pingTimeout":20000,"maxPayload":1000000}) +0ms
engine:polling setting request +2m
engine:socket flushing buffer to transport +2ms
engine:polling writing "0{"sid":"6jpqT3pb-QM5YVaSAAAC","upgrades":["websocket"],"pingInterval":25000,"pingTimeout":20000,"maxPayload":1000000}" +1ms
engine:socket executing batch send callback +3ms
socket.io:server incoming connection with id 6jpqT3pb-QM5YVaSAAAC +2m
engine intercepting request for path "/socket.io/" +26ms
engine handling "POST" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8iRyQ&sid=6jpqT3pb-QM5YVaSAAAC" +0ms
engine applying middleware n°1 +1ms
engine setting new request for existing client +0ms
engine:polling received "40" +24ms
engine:socket received packet message +22ms
socket.io-parser decoded 0 as {"type":0,"nsp":"/"} +2m
socket.io:client connecting to namespace / +2m
socket.io:namespace adding socket to nsp / +2m
authorize: redis.getSet
4
b7925e14f30b2090750bfe571b0863fd
engine applying middleware n°1 +9ms
engine writing headers: {"Access-Control-Allow-Origin":"*"} +1ms
engine upgrading existing transport +1ms
engine:transport readyState updated from undefined to open (websocket) +38ms
engine:socket might upgrade socket transport from "polling" to "websocket" +10ms
engine intercepting request for path "/socket.io/" +8ms
engine handling "GET" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8iRyo&sid=6jpqT3pb-QM5YVaSAAAC" +0ms
engine applying middleware n°1 +0ms
engine setting new request for existing client +0ms
engine:polling setting request +19ms
engine:ws received "2probe" +2m
engine:socket got probe ping packet, sending pong +20ms
engine:ws writing "3probe" +0ms
{"user_id":4}
redisGetSet.hget success
socket.io:socket socket connected - writing packet +2m
socket.io:socket join room eDYabUPf_IiKdurPAAAD +0ms
socket.io-parser encoding packet {"type":0,"data":{"sid":"eDYabUPf_IiKdurPAAAD"},"nsp":"/"} +57ms
socket.io-parser encoded {"type":0,"data":{"sid":"eDYabUPf_IiKdurPAAAD"},"nsp":"/"} as 0{"sid":"eDYabUPf_IiKdurPAAAD"} +0ms
engine:socket sending packet "message" (0{"sid":"eDYabUPf_IiKdurPAAAD"}) +28ms
engine:socket flushing buffer to transport +0ms
engine:polling writing "40{"sid":"eDYabUPf_IiKdurPAAAD"}" +39ms
engine:ws received "5" +42ms
engine:socket got upgrade packet - upgrading +14ms
engine:transport readyState updated from open to closing (polling) +64ms
engine:polling closing +16ms
engine:polling transport discarded - closing right away +0ms
engine:transport readyState updated from closing to closed (polling) +0ms
engine:ws received "42["realtime_user_id_connected",{"userId":4}]" +8ms
engine:socket received packet message +8ms
socket.io-parser decoded 2["realtime_user_id_connected",{"userId":4}] as {"type":2,"nsp":"/","data":["realtime_user_id_connected",{"userId":4}]} +23ms
socket.io:socket got packet {"type":2,"nsp":"/","data":["realtime_user_id_connected",{"userId":4}]} +24ms
socket.io:socket emitting event ["realtime_user_id_connected",{"userId":4}] +2ms
socket.io:socket dispatching an event ["realtime_user_id_connected",{"userId":4}] +1ms
Realtime User ID connected: 4
engine:ws received "42["subscribe","realtime_msg"]" +8ms
engine:socket received packet message +8ms
socket.io-parser decoded 2["subscribe","realtime_msg"] as {"type":2,"nsp":"/","data":["subscribe","realtime_msg"]} +7ms
socket.io:socket got packet {"type":2,"nsp":"/","data":["subscribe","realtime_msg"]} +4ms
socket.io:socket emitting event ["subscribe","realtime_msg"] +0ms
socket.io:socket dispatching an event ["subscribe","realtime_msg"] +0ms
socket subscription: realtime_msg
socket.io:socket join room realtime_msg +1ms
engine:socket writing ping packet - expecting pong within 20000ms +25s
engine:socket sending packet "ping" (undefined) +3ms
engine:socket flushing buffer to transport +2ms
engine:ws writing "2" +25s
engine:socket executing batch send callback +0ms
engine:ws received "3" +1ms
engine:socket received packet pong +1ms
engine:socket got pong +0ms
engine:socket writing ping packet - expecting pong within 20000ms +25s
engine:socket sending packet "ping" (undefined) +0ms
engine:socket flushing buffer to transport +0ms
engine:ws writing "2" +25s
engine:ws received "3" +2ms
engine:socket received packet pong +3ms
engine:socket got pong +0ms
{
msg: {
resource: 'alerts',
action: 'update',
id: 13,
obj: {
msg: 'testing testing 1 2 3',
id: 13,
}
},
recipient_user_ids: [ -1, 42 ]
}
in redis subscrbe channel: realtime_msg
socket.io:socket emitting packet with ack id 0 +1m
socket.io-parser encoding packet {"type":2,"data":["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]"}],"id":0,"nsp":"/"} +1m
socket.io-parser encoded {"type":2,"data":["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]","id":0,"nsp":"/"} as 20["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]" +0ms
engine:socket sending packet "message" (20["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]") +15s
engine:socket flushing buffer to transport +0ms
engine:ws writing "420["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]" +15s
socket after emit
engine:socket writing ping packet - expecting pong within 20000ms +10s
engine:socket sending packet "ping" (undefined) +0ms
engine:socket flushing buffer to transport +0ms
engine:ws writing "2" +10s
engine:ws received "3" +1ms