Unable to setup request/response based messaging in Kafka NestJS microservice

2.4k Views Asked by At

I am trying to create microservice architecture where I have api-gateway and register-service microservice.Every request is handeled by api-gateway after getting user registered I want to send response back.I have set everything up but in api-gateway its showing error like this:

ERROR [ClientKafka] ERROR [Connection] Response Metadata(key: 3, version: 6) 
{"timestamp":"2022-04-16T07:44:29.289Z","logger":"kafkajs","broker":"example-server-eu1- 
kafka.upstash.io:9092","clientId":"register_user-client","error":"This server does not host 
this topic-partition","correlationId":4,"size":250}

Broker url is dummy for security purposes.

Below is my code:

api-gateway/app.module.ts

import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { AppController } from './app.controller';
import { AppService } from './app.service';

  @Module({
  imports: [
         ConfigModule.forRoot(),
         ClientsModule.register([
          {
           name: 'REGISTER_SERVICE',
           transport: Transport.KAFKA,
           options:{
             client:{
               clientId: 'register_user',
               brokers: [process.env.KAFKA_BROKER],
               sasl: {
                mechanism: 'scram-sha-256',
                username: process.env.KAFKA_USERNAME,
                password: process.env.KAFKA_PASSWORD,
              },
              ssl: true,
             },
             consumer:{
               groupId: 'register-consumer'
             }
           }
         }
        ])
       ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

api-gateway/app.controller.ts

import { Body, Controller, Get, Inject, OnModuleInit, Post } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { AppService } from './app.service';
import { UserDto } from './dto/user.dto';

@Controller('api')
export class AppController implements OnModuleInit {
  constructor(private readonly appService: AppService,
          @Inject('REGISTER_SERVICE') private clientKafka:ClientKafka) {}

async onModuleInit(){
  this.clientKafka.subscribeToResponseOf('ADD_USER');
  await this.clientKafka.connect();
}

@Post('register')
 getResponse(@Body() userDto:UserDto){
  return this.appService.getResponse(userDto);
 }          
}

api-gateway/app.service.ts

import { Inject, Injectable } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { UserDto } from './dto/user.dto';
import { Model } from 'mongoose';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class AppService {
  constructor(@Inject('REGISTER_SERVICE') private clientKafka:ClientKafka){}

async getResponse(userDto:UserDto){
 return this.clientKafka.send('ADD_USER',userDto);
 }
}

register-service/main.ts

import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
 transport: Transport.KAFKA,
  options: {
    client: {
      brokers: [process.env.KAFKA_BROKER],
      sasl: {
        mechanism: 'scram-sha-256',
        username: process.env.KAFKA_USERNAME,
        password: process.env.KAFKA_PASSWORD,
      },
      ssl: true,
     },
    consumer:{
    groupId: 'register-consumer'
   }
  }
});
app.listen();
}
bootstrap();

register-service/app.controller.ts

import { Controller, Get } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
import { AppService } from './app.service';

@Controller()
export class AppController {
 constructor(private readonly appService: AppService) {}

@MessagePattern('ADD_USER')
handleRegistration(data){
  return this.appService.handleRegistration(data.value);
 }
}

register-service/app.service.ts

import { Injectable } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { User } from './schema/user.schema';
import { Model } from 'mongoose';

@Injectable()
export class AppService {
 constructor(@InjectModel('User') private readonly userModel:Model<User>){}

async handleRegistration(data){
 console.log(data);
 const dataSave = await new this.userModel(data).save();
 console.log(dataSave);
 return "User registered successfully";
 }
}


        
0

There are 0 best solutions below