I am trying to create microservice architecture where I have api-gateway
and register-service
microservice.Every request is handeled by api-gateway
after getting user registered I want to send response back.I have set everything up but in api-gateway
its showing error like this:
ERROR [ClientKafka] ERROR [Connection] Response Metadata(key: 3, version: 6) {"timestamp":"2022-04-16T07:44:29.289Z","logger":"kafkajs","broker":"example-server-eu1- kafka.upstash.io:9092","clientId":"register_user-client","error":"This server does not host this topic-partition","correlationId":4,"size":250}
Broker url is dummy for security purposes.
Below is my code:
api-gateway/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { AppController } from './app.controller';
import { AppService } from './app.service';
@Module({
imports: [
ConfigModule.forRoot(),
ClientsModule.register([
{
name: 'REGISTER_SERVICE',
transport: Transport.KAFKA,
options:{
client:{
clientId: 'register_user',
brokers: [process.env.KAFKA_BROKER],
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
ssl: true,
},
consumer:{
groupId: 'register-consumer'
}
}
}
])
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
api-gateway/app.controller.ts
import { Body, Controller, Get, Inject, OnModuleInit, Post } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { AppService } from './app.service';
import { UserDto } from './dto/user.dto';
@Controller('api')
export class AppController implements OnModuleInit {
constructor(private readonly appService: AppService,
@Inject('REGISTER_SERVICE') private clientKafka:ClientKafka) {}
async onModuleInit(){
this.clientKafka.subscribeToResponseOf('ADD_USER');
await this.clientKafka.connect();
}
@Post('register')
getResponse(@Body() userDto:UserDto){
return this.appService.getResponse(userDto);
}
}
api-gateway/app.service.ts
import { Inject, Injectable } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { UserDto } from './dto/user.dto';
import { Model } from 'mongoose';
import { ClientKafka } from '@nestjs/microservices';
@Injectable()
export class AppService {
constructor(@Inject('REGISTER_SERVICE') private clientKafka:ClientKafka){}
async getResponse(userDto:UserDto){
return this.clientKafka.send('ADD_USER',userDto);
}
}
register-service/main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: [process.env.KAFKA_BROKER],
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
ssl: true,
},
consumer:{
groupId: 'register-consumer'
}
}
});
app.listen();
}
bootstrap();
register-service/app.controller.ts
import { Controller, Get } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@MessagePattern('ADD_USER')
handleRegistration(data){
return this.appService.handleRegistration(data.value);
}
}
register-service/app.service.ts
import { Injectable } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { User } from './schema/user.schema';
import { Model } from 'mongoose';
@Injectable()
export class AppService {
constructor(@InjectModel('User') private readonly userModel:Model<User>){}
async handleRegistration(data){
console.log(data);
const dataSave = await new this.userModel(data).save();
console.log(dataSave);
return "User registered successfully";
}
}