Nodejs Socketio server socket.emit nor io.emit working but socket.on does

308 Views Asked by At

I am using nodejs, socketio, and redis on my backend server and angular v15 with ngx-socketio2 for the client. I can see that the middleware io.use, channel subscription, the socket subscribe event, and redis subscription are all working. Yet for some reason the channel emit wont work. I have tried both socket.emit and io.emit and neither works. The socket.emit fails silently, never firing the response callback. The io.emit responds immediately with: Error: operation has timed out at Timeout._onTimeout (<project-path>/node_modules/socket.io/dist/broadcast-operator.js:182:17)

I can also see from console and network that the browser never received the channel emit, though client seems to communicate with the server: Socket created, Client emit received,Client subscribe received and the server is properly subscribed to Redis.

The code is pretty straightforward:

const express = require("express");
const app = express();
const url = require("node:url");
const server = require("http").Server(app);
// So that it can be accessed from any origin
const io = require("socket.io")(server, {
  cors: {
    origin: "*",
  },
});
const redis = require("redis");
const rtRedisUrl = process.env.REALTIME_REDIS || "redis://localhost:6379/0";

const main = async () => {
  // Creating a redis client
  const redisClientPub = redis.createClient({
    url: rtRedisUrl,
  });
  await redisClientPub.connect();

  // creating a redis subscriber
  const redisClientSub = redisClientPub.duplicate();
  await redisClientSub.connect();
  const redisGetSet = redisClientPub.duplicate();
  await redisGetSet.connect();

  app.get("/", (req, res) => {
    res.send("success");
  });
  // setup middleware
  io.use(async (socket, next) =>{
    var sessionId = null;
    var userId = null;

    // var url = require('url');
    var requestUrl = url.parse(socket.request.url);
    var requestQuery = requestUrl.query;
    var requestParams = requestQuery.split('&');
    var params = {};
    for (let i=0; i<=requestParams.length; i++){
      let param = requestParams[i];
      if (param){
        let p=param.split('=');
        if (p.length != 2) { continue };
        params[p[0]] = p[1];
      }
    }

    sessionId = params["_rtToken"];
    userId = params["_rtUserId"];

    // retrieve session from redis using the unique key stored in cookies
    console.log('authorize: redis.getSet');
    console.log(userId);
    console.log(sessionId);

    // redis.getSet.hGetAll("rtSession-" + userId)
    const session=  await redisGetSet.HGET(("rtSession-" + userId), sessionId);
    console.log(session);
    if (!session) {
      console.log('redisGetSet.hget err');
      next(new Error('Unauthorized Realtime user (session)'));
    } else {
      console.log('redisGetSet.hget success');
      socket.request.session = JSON.parse(session);
      next();
    };
  });

  // Creating a websocket connection.
  // channel should be realtime_msg ..as requested from client
  io.on("connection", (socket) => {
    socket.on("subscribe", async (channel) => { // works
      console.log('socket subscription: '+channel);
      socket.join(channel);
      await redisClientSub.subscribe(channel, (message) => { //works
        var msg = JSON.parse(message);
        
        console.log('in redis subscribe channel: '+channel);
        
        io.emit(channel, msg, (response) => {   //Failing
            console.log('socket emit response');
            console.log(response); // "got it"
          } );
          console.log('socket after emit:');
      });
    });

    socket.on("unsubscribe", async (channel) => {
      await redisClientSub.unsubscribe(channel);
    });

    
    socket.on('realtime_user_id_connected', (message) => { // works
      console.log('Realtime User ID connected: ' + message.userId);
    });
  });

the package.json:

{
  "name": "headcount-realtime",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node server.js"
  },
  "engines": {
    "npm": "9.4.0",
    "node": "v19.6.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "express": "4.18.2",
    "redis": "4.6.6",
    "socket.io": "4.6.1"
  }
}

When a client connects it sends along a user_id and token to which is verified in the redisGetSet call. The client then emits 'realtime_user_id_connected' and subscribes to the 'realtime_msg' channel. The output on the server is:

> [email protected] start
> node server.js

Server is running
authorize: redis.getSet
4
b7925e14f30b2090750bfe571b0863fd
{"user_id":4}
redisGetSet.hget success
authorize: redis.getSet
4
b7925e14f30b2090750bfe571b0863fd
{"user_id":4}
redisGetSet.hget success
Realtime User ID connected: 4
socket subscription: realtime_msg
{
  msg: {
    resource: 'alerts',
    action: 'update',
    id: 13,
    obj: {
      msg: 'testing testing 1 2 3',
      id: 13
    }
  },
  recipient_user_ids: [ -1, 42 ]
}
in redis subscribe channel: realtime_msg
socket after emit:
socket emit response
Error: operation has timed out
    at Timeout._onTimeout (<project-folder>/node_modules/socket.io/dist/broadcast-operator.js:182:17)

output with with DEBUG enabled:

  express:application set "x-powered-by" to true +0ms
  express:application set "etag" to 'weak' +2ms
  express:application set "etag fn" to [Function: generateETag] +0ms
  express:application set "env" to 'development' +1ms
  express:application set "query parser" to 'extended' +0ms
  express:application set "query parser fn" to [Function: parseExtendedQueryString] +2ms
  express:application set "subdomain offset" to 2 +0ms
  express:application set "trust proxy" to false +0ms
  express:application set "trust proxy fn" to [Function: trustNone] +1ms
  express:application booting in development mode +0ms
  express:application set "view" to [Function: View] +0ms
  express:application set "views" to '/Users/dreadstar/workspace/headcount-realtime/views' +1ms
  express:application set "jsonp callback name" to 'callback' +0ms
  socket.io:server initializing namespace / +0ms
  socket.io:server creating engine.io instance with opts {"cors":{"origin":"*"},"cleanupEmptyChildNamespaces":false,"path":"/socket.io"} +1ms
  socket.io:server attaching client serving req handler +4ms
  express:router use '/' query +583ms
  express:router:layer new '/' +0ms
  express:router use '/' expressInit +1ms
  express:router:layer new '/' +0ms
  express:router:route new '/' +1ms
  express:router:layer new '/' +0ms
  express:router:route get '/' +1ms
  express:router:layer new '/' +0ms
Server is running
  engine intercepting request for path "/socket.io/" +0ms
  engine handling "GET" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8hxeE" +1ms
  engine applying middleware n°1 +1ms
  engine handshaking client "dwnoIS9YABCzpow1AAAA" +3ms
  engine:transport readyState updated from undefined to open (polling) +0ms
  engine:socket readyState updated from undefined to opening +0ms
  engine:socket readyState updated from opening to open +1ms
  engine:socket sending packet "open" ({"sid":"dwnoIS9YABCzpow1AAAA","upgrades":["websocket"],"pingInterval":25000,"pingTimeout":20000,"maxPayload":1000000}) +0ms
  engine:polling setting request +0ms
  engine:socket flushing buffer to transport +0ms
  engine:polling writing "0{"sid":"dwnoIS9YABCzpow1AAAA","upgrades":["websocket"],"pingInterval":25000,"pingTimeout":20000,"maxPayload":1000000}" +1ms
  engine:socket executing batch send callback +11ms
  socket.io:server incoming connection with id dwnoIS9YABCzpow1AAAA +774ms
  engine intercepting request for path "/socket.io/" +25ms
  engine handling "POST" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8hxeo&sid=dwnoIS9YABCzpow1AAAA" +0ms
  engine applying middleware n°1 +1ms
  engine setting new request for existing client +1ms
  engine:polling received "40" +26ms
  engine:socket received packet message +17ms
  socket.io-parser decoded 0 as {"type":0,"nsp":"/"} +0ms
  socket.io:client connecting to namespace / +0ms
  socket.io:namespace adding socket to nsp / +0ms
authorize: redis.getSet
4
b7925e14f30b2090750bfe571b0863fd
  engine applying middleware n°1 +18ms
  engine writing headers: {"Access-Control-Allow-Origin":"*"} +3ms
  engine upgrading existing transport +3ms
  engine:transport readyState updated from undefined to open (websocket) +51ms
  engine:socket might upgrade socket transport from "polling" to "websocket" +22ms
  engine intercepting request for path "/socket.io/" +1ms
  engine handling "GET" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8hxew&sid=dwnoIS9YABCzpow1AAAA" +0ms
  engine applying middleware n°1 +0ms
  engine setting new request for existing client +0ms
  engine:polling setting request +24ms
  engine:ws received "2probe" +0ms
  engine:socket got probe ping packet, sending pong +2ms
  engine:ws writing "3probe" +0ms
{"user_id":4}
redisGetSet.hget success
  socket.io:socket socket connected - writing packet +0ms
  socket.io:socket join room QYUIpAQxlw_bi-yeAAAB +0ms
  socket.io-parser encoding packet {"type":0,"data":{"sid":"QYUIpAQxlw_bi-yeAAAB"},"nsp":"/"} +22ms
  socket.io-parser encoded {"type":0,"data":{"sid":"QYUIpAQxlw_bi-yeAAAB"},"nsp":"/"} as 0{"sid":"QYUIpAQxlw_bi-yeAAAB"} +0ms
  engine:socket sending packet "message" (0{"sid":"QYUIpAQxlw_bi-yeAAAB"}) +4ms
  engine:socket flushing buffer to transport +0ms
  engine:polling writing "40{"sid":"QYUIpAQxlw_bi-yeAAAB"}" +5ms
  engine:ws received "5" +7ms
  engine:socket got upgrade packet - upgrading +3ms
  engine:transport readyState updated from open to closing (polling) +10ms
  engine:polling closing +4ms
  engine:polling transport discarded - closing right away +0ms
  engine:transport readyState updated from closing to closed (polling) +0ms
  engine:socket writing ping packet - expecting pong within 20000ms +25s
  engine:socket sending packet "ping" (undefined) +0ms
  engine:socket flushing buffer to transport +0ms
  engine:ws writing "2" +25s
  engine:socket executing batch send callback +1ms
  engine:ws received "3" +1ms
  engine:socket received packet pong +1ms
  engine:socket got pong +0ms
  engine:transport readyState updated from open to closed (websocket) +34s
  engine:socket readyState updated from open to closed +9s
  socket.io:client client close with reason transport close +34s
  socket.io:socket closing socket - reason transport close +34s
  engine intercepting request for path "/socket.io/" +2m
  engine handling "GET" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8iRx5" +1ms
  engine applying middleware n°1 +2ms
  engine handshaking client "6jpqT3pb-QM5YVaSAAAC" +3ms
  engine:transport readyState updated from undefined to open (polling) +2m
  engine:socket readyState updated from undefined to opening +2m
  engine:socket readyState updated from opening to open +0ms
  engine:socket sending packet "open" ({"sid":"6jpqT3pb-QM5YVaSAAAC","upgrades":["websocket"],"pingInterval":25000,"pingTimeout":20000,"maxPayload":1000000}) +0ms
  engine:polling setting request +2m
  engine:socket flushing buffer to transport +2ms
  engine:polling writing "0{"sid":"6jpqT3pb-QM5YVaSAAAC","upgrades":["websocket"],"pingInterval":25000,"pingTimeout":20000,"maxPayload":1000000}" +1ms
  engine:socket executing batch send callback +3ms
  socket.io:server incoming connection with id 6jpqT3pb-QM5YVaSAAAC +2m
  engine intercepting request for path "/socket.io/" +26ms
  engine handling "POST" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8iRyQ&sid=6jpqT3pb-QM5YVaSAAAC" +0ms
  engine applying middleware n°1 +1ms
  engine setting new request for existing client +0ms
  engine:polling received "40" +24ms
  engine:socket received packet message +22ms
  socket.io-parser decoded 0 as {"type":0,"nsp":"/"} +2m
  socket.io:client connecting to namespace / +2m
  socket.io:namespace adding socket to nsp / +2m
authorize: redis.getSet
4
b7925e14f30b2090750bfe571b0863fd
  engine applying middleware n°1 +9ms
  engine writing headers: {"Access-Control-Allow-Origin":"*"} +1ms
  engine upgrading existing transport +1ms
  engine:transport readyState updated from undefined to open (websocket) +38ms
  engine:socket might upgrade socket transport from "polling" to "websocket" +10ms
  engine intercepting request for path "/socket.io/" +8ms
  engine handling "GET" http request "/socket.io/?_rtUserId=4&_rtToken=b7925e14f30b2090750bfe571b0863fd&EIO=4&transport=polling&t=OX8iRyo&sid=6jpqT3pb-QM5YVaSAAAC" +0ms
  engine applying middleware n°1 +0ms
  engine setting new request for existing client +0ms
  engine:polling setting request +19ms
  engine:ws received "2probe" +2m
  engine:socket got probe ping packet, sending pong +20ms
  engine:ws writing "3probe" +0ms
{"user_id":4}
redisGetSet.hget success
  socket.io:socket socket connected - writing packet +2m
  socket.io:socket join room eDYabUPf_IiKdurPAAAD +0ms
  socket.io-parser encoding packet {"type":0,"data":{"sid":"eDYabUPf_IiKdurPAAAD"},"nsp":"/"} +57ms
  socket.io-parser encoded {"type":0,"data":{"sid":"eDYabUPf_IiKdurPAAAD"},"nsp":"/"} as 0{"sid":"eDYabUPf_IiKdurPAAAD"} +0ms
  engine:socket sending packet "message" (0{"sid":"eDYabUPf_IiKdurPAAAD"}) +28ms
  engine:socket flushing buffer to transport +0ms
  engine:polling writing "40{"sid":"eDYabUPf_IiKdurPAAAD"}" +39ms
  engine:ws received "5" +42ms
  engine:socket got upgrade packet - upgrading +14ms
  engine:transport readyState updated from open to closing (polling) +64ms
  engine:polling closing +16ms
  engine:polling transport discarded - closing right away +0ms
  engine:transport readyState updated from closing to closed (polling) +0ms
  engine:ws received "42["realtime_user_id_connected",{"userId":4}]" +8ms
  engine:socket received packet message +8ms
  socket.io-parser decoded 2["realtime_user_id_connected",{"userId":4}] as {"type":2,"nsp":"/","data":["realtime_user_id_connected",{"userId":4}]} +23ms
  socket.io:socket got packet {"type":2,"nsp":"/","data":["realtime_user_id_connected",{"userId":4}]} +24ms
  socket.io:socket emitting event ["realtime_user_id_connected",{"userId":4}] +2ms
  socket.io:socket dispatching an event ["realtime_user_id_connected",{"userId":4}] +1ms
Realtime User ID connected: 4
  engine:ws received "42["subscribe","realtime_msg"]" +8ms
  engine:socket received packet message +8ms
  socket.io-parser decoded 2["subscribe","realtime_msg"] as {"type":2,"nsp":"/","data":["subscribe","realtime_msg"]} +7ms
  socket.io:socket got packet {"type":2,"nsp":"/","data":["subscribe","realtime_msg"]} +4ms
  socket.io:socket emitting event ["subscribe","realtime_msg"] +0ms
  socket.io:socket dispatching an event ["subscribe","realtime_msg"] +0ms
socket subscription: realtime_msg
  socket.io:socket join room realtime_msg +1ms
  engine:socket writing ping packet - expecting pong within 20000ms +25s
  engine:socket sending packet "ping" (undefined) +3ms
  engine:socket flushing buffer to transport +2ms
  engine:ws writing "2" +25s
  engine:socket executing batch send callback +0ms
  engine:ws received "3" +1ms
  engine:socket received packet pong +1ms
  engine:socket got pong +0ms
  engine:socket writing ping packet - expecting pong within 20000ms +25s
  engine:socket sending packet "ping" (undefined) +0ms
  engine:socket flushing buffer to transport +0ms
  engine:ws writing "2" +25s
  engine:ws received "3" +2ms
  engine:socket received packet pong +3ms
  engine:socket got pong +0ms
{
  msg: {
    resource: 'alerts',
    action: 'update',
    id: 13,
    obj: {
      msg: 'testing testing 1 2 3',
      id: 13,
    }
  },
  recipient_user_ids: [ -1, 42 ]
}
in redis subscrbe channel: realtime_msg
  socket.io:socket emitting packet with ack id 0 +1m
  socket.io-parser encoding packet {"type":2,"data":["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]"}],"id":0,"nsp":"/"} +1m
  socket.io-parser encoded {"type":2,"data":["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]","id":0,"nsp":"/"} as 20["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]" +0ms
  engine:socket sending packet "message" (20["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]") +15s
  engine:socket flushing buffer to transport +0ms
  engine:ws writing "420["realtime_msg",{"msg":{"resource":"alerts","action":"update","id":13,"obj":{"msg":"testing testing 1 2 3","id":13}},"recipient_user_ids":[-1,42]}]" +15s
socket after emit
  engine:socket writing ping packet - expecting pong within 20000ms +10s
  engine:socket sending packet "ping" (undefined) +0ms
  engine:socket flushing buffer to transport +0ms
  engine:ws writing "2" +10s
  engine:ws received "3" +1ms
0

There are 0 best solutions below