Nestjs: Unable to implement WebSocketGateway with multiple pods

313 Views Asked by At

I have multinode cluster and am running NestJS app with 2 pods. Locally it works fine, since its a single node, but fails on production with multiple nodes i.e. app deploys successfully but does not load. I have a /docs endpoint which is expected to load swagger UI and after this change, apps stays in loading state, before it times out.

I am using @nestjs/websockets and socket.io for this and have referred documentation:

https://docs.nestjs.com/websockets/gateways

Followed this blog and socket.io documentation for sticky session implementation:

I am deploying my web service on digitalocean platform (I have created an App).

Here is the main.ts file:

import {ValidationPipe} from '@nestjs/common';
import {NestFactory} from '@nestjs/core';
import {DocumentBuilder, SwaggerCustomOptions, SwaggerModule} from '@nestjs/swagger';
import {AppModule} from './app.module';
import * as cluster from 'cluster';
import {cpus} from 'os';
import {RedisIoAdapter} from '../redis-adapter/redis-io-adapter';
import {fingerprint32} from 'farmhash';
import * as net from 'net';

export const clusterModule = cluster as unknown as cluster.Cluster;
const numCPUs = cpus().length;
const workers: any = {};

const getWorkerIndex = (ip: string) => {
  return fingerprint32(ip) % numCPUs;
};

async function bootstrap() {
  if (clusterModule.isPrimary) {
    console.log(`Master server started,proccess.pid:${process.pid}, number of cpus: ${numCPUs}`);
    for (let i = 0; i < numCPUs; i++) {
      workers[i] = clusterModule.fork();
      workers[i].on('exit', (_: cluster.Worker, code: any, signal: any) => {
        console.log(`Worker with code: ${code} and signal: ${signal} is Restarting...`);
        workers[i] = clusterModule.fork();
      });
    }
    net
      .createServer({pauseOnConnect: true}, (connection) => {
        const workerIndex = getWorkerIndex(connection.remoteAddress as string);
        workers[workerIndex].send('sticky-session:connection', connection);
      })
      .listen(process.env.npm_package_config_port || 4001);
  } else {
    const app = await NestFactory.create(AppModule);
    app.enableCors();

    // swagger setup


    const redisIoAdapter = new RedisIoAdapter(app);
    await redisIoAdapter.connectToRedis();

    app.useWebSocketAdapter(redisIoAdapter);

    const appServer = await app.listen(0);
    process.on('message', (message, connection: any) => {
      if (message !== 'sticky-session:connection') {
        return;
      }
      appServer.emit('connection', connection);
      connection.resume();
    });
  }
}

bootstrap();

redis-io-adapter code:

import {IoAdapter} from '@nestjs/platform-socket.io';
import {ServerOptions} from 'socket.io';
import {createAdapter} from '@socket.io/redis-adapter';
import {createClient} from 'redis';

export class RedisIoAdapter extends IoAdapter {
  private adapterConstructor: ReturnType<typeof createAdapter> | undefined;

  async connectToRedis(): Promise<void> {
    const pubClient = createClient({url: `redis://<redis-url>`});
    const subClient = pubClient.duplicate();

    await Promise.all([pubClient.connect(), subClient.connect()]);

    this.adapterConstructor = createAdapter(pubClient, subClient);
  }

  createIOServer(port: number, options?: ServerOptions): any {
    const server = super.createIOServer(port, options);
    server.adapter(this.adapterConstructor);
    return server;
  }
}

websocket gateway code

import {SubscribeMessage, WebSocketGateway} from '@nestjs/websockets';
import {Socket} from 'socket.io';

@WebSocketGateway()
export class MyGateway {
  @SubscribeMessage('message')
  handleMessage(client: Socket, payload: string): void {
    client.emit('replyMessage', {
      message: payload,
      replyMessage: `reply message from worker with process.pid:
       ${process.pid}`,
    });
  }
}

Here's the App spec file:

domains:
- domain: some.demo.org
  type: PRIMARY
  zone: some.org
features:
- buildpack-stack=some-linux-value
ingress:
  rules:
  - component:
      name: some-microservices
    match:
      path:
        prefix: /
name: some-api
region: some-region
services:
- alerts:
  // alerts
  dockerfile_path: Dockerfile
  envs:
  // some envs

How do I fix this or is there an alternative to this, maybe modifying App spec?

0

There are 0 best solutions below