Infinite retries when using RpcFilter in NestJS microservice setup with Kafka

927 Views Asked by At

I am new to Kafka and I am experiencing a mixed behaviour when trying to setup proper error handling on my consumer when there is an error. In few instances I am observing retry policy in action - kafka retries my message 5 times(as what I configured) then consumer crashes, then recovers and my group rebalanaces. However, in other instances that's not happens - consumer crashes, then recovers and my group rebalances and consumer attempts to consume the message again and again, inifinitely.

Let's say I have a controller method that's subscribed to a Kafka topic

@EventPattern("cat-topic")
  public async createCat(
    @Payload()
    message: CatRequestDto,
    @Ctx() context: IKafkaContext
  ): Promise<void> {
    try {
      await this.catService.createCat(message);
    } catch (ex) {
      this.logger.error(ex);
      throw new RpcException(
        `Couldn't create a cat`
      );
    }
  }

Using RpcFilter on this method, like this one - https://docs.nestjs.com/microservices/exception-filters :


import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';

@Catch(RpcException)
export class ExceptionFilter implements RpcExceptionFilter<RpcException> {
  catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
    return throwError(() => exception.getError());
  }
}

I feel like it might be something funky happening with properly committing offsets or something else. Can't pinpoint it.

Any comments are suggestions are greatly appreciated.

1

There are 1 best solutions below

1
On

Regarding this NestJs Docs it's normal behavior if you use @EventPattern along with RPC exception.

but the main issue is that the exception will keep firing even after the number of retries exceeded but by replacing the original Kafka server with the fixed one it solves the main issue

here is what you should do

  1. define a new class that is being extended from ServerKafka
import {
  KafkaOptions,
  KafkaRetriableException,
  ServerKafka,
} from '@nestjs/microservices';
import { Observable, ReplaySubject } from 'rxjs';

export class KafkaServerWithBugFixed extends ServerKafka {
  constructor(kafkaOptions: KafkaOptions) {
    super(kafkaOptions['options']);
  }
  combineStreamsAndThrowIfRetriable(
    response$: Observable<any>,
    replayStream$: ReplaySubject<unknown>,
  ) {
    return new Promise<void>((resolve, reject) => {
      response$.subscribe({
        next: (val) => {
          replayStream$.next(val);
          resolve();
        },
        error: (err) => {
          if (err instanceof KafkaRetriableException) {
            reject(err);
          }
          replayStream$.error(err);
          resolve();
        },
        complete: () => replayStream$.complete(),
      });
    });
  }
}

  1. Init the Main nestjs microservice app with the previous class like this
  app.connectMicroservice<MicroserviceOptions>(
    {
      strategy: new KafkaServerWithBugFixed(kafkaOptions),
    },
    {
      inheritAppConfig: true,
    },
  );