Force an observable to throw based on another observable

29 Views Asked by At

I have a function named "poll" which enables me to poll an API endpoint.

At some point, I would like the polling to stop by a manual action, here named "manualForceStop" which is a BehaviorSubject initialized with "false" at the beginning. At some point (when user triggers an action for example), I will pass manualForceStop to true. At this moment, I want to unsubscribe and throw a specific error, for example "throwError(() => 'MANUAL_FORCE_STOP'".

I don't know how to do, since retry will always... retry. Any idea ?

const CONVERT_SECONDS_TO_MS = 1000;
interface PollCmd<TApiArgs, TApiResponse> {
  intervalInSeconds: number;
  timeoutInSeconds: number;
  apiCall: (arg?: TApiArgs) => Observable<TApiResponse>;
  shouldContinuePolling: (arg: TApiResponse) => boolean;
  manualForceStop: BehaviorSubject<boolean>;
}

export function poll<TApiArgs, TApiResponse>(
  input: PollCmd<TApiArgs, TApiResponse>
): Observable<TApiResponse> {
  return input.apiCall().pipe(
    switchMap((apiResponse) => {
      if (input.shouldContinuePolling(apiResponse)) {
        return throwError(() => 'CONDITION_NOT_REACHED');
      }

      return of(apiResponse);
    }),
    retry({ delay: input.intervalInSeconds * CONVERT_SECONDS_TO_MS }),
    timeout({ first: input.timeoutInSeconds * CONVERT_SECONDS_TO_MS })
    /* 
      At some point, manualForceStop passes from "false" to "true" and then,
      I would like to unsubscribe and throw a specific error
    */
  );
1

There are 1 best solutions below

0
Vivick On BEST ANSWER

Looking at retry's RetryConfig#delay property I noticed you can pass a function instead of a number.

const CONDITION_NOT_REACHED = 'CONDITION_NOT_REACHED';
export function poll<TApiArgs, TApiResponse>(
  input: PollCmd<TApiArgs, TApiResponse>
): Observable<TApiResponse> {
  return input.apiCall().pipe(
    switchMap((apiResponse) => {
      if (input.shouldContinuePolling(apiResponse)) {
        return throwError(() => CONDITION_NOT_REACHED);
      }

      return of(apiResponse);
    }),
    retry({ delay: (err, retryCount) => {
      if (err === CONDITION_NOT_REACHED) {
        return throwError(() => CONDITION_NOT_REACHED);
      }

      return timer(input.intervalInSeconds * CONVERT_SECONDS_TO_MS).pipe(take(1));
    } }),
    timeout({ first: input.timeoutInSeconds * CONVERT_SECONDS_TO_MS })
    /* 
      At some point, manualForceStop passes from "false" to "true" and then,
      I would like to unsubscribe and throw a specific error
    */
  );
}

The observable returned from the function passed to delay has these properties:

  • When it emits, the retry occurs
  • If it throws, the error will be propagated
  • If it doesn't emit, the error won't be propagated and it completes without errors

Then later on you can use it like this:

const myPoll$ = poll(/* [...] */);
myPoll$.subscribe({
  error: err => {
    if (err === CONDITION_NOT_REACHED) {
       // do something here
    }
  },
});