How to chain api calls using rxjs?

850 Views Asked by At

I understand that there are lot of questions on the title's subject but I wasn't able to find clear and concise question/response that would help me with following.

Assume we have x amount of web api calls that return observables Observable<cs[]>... See

  init() {
    this.service.serviceA().subscribe({
      next: (as) => {
        this.as = as;
        this.service.serviceB().subscribe({
          next: (bs) => {
            this.bs = bs;
            this.service.serviceC().subscribe({
              next: (cs) => {
                this.cs = cs;
                this.restructureABCs();
              },
              error: (e) => this.notification.showError('Failed to load cs! Error: ' + e.messageerror)
            });
          },
          error: (e) => this.notification.showError('Failed to load bs! Error: ' + e.messageerror)
        });
      },
      error: (e) => this.notification.showError('Failed to load as! Error: ' + e.messageerror)
    })
  }

Function restructureABCs() depends on as/bs & cs. as, bs & cs don't depend on each other. Current implementation is ugly and can be improved but I'm not sure which operator I should use. It doesn't make sense to me to use concatMap as (according to my possibly flawed understanding) multiple streams are going to be merged into one, which I don't want. All I need is to make sure that as/bs & cs are loaded before restructureABCs() function is invoked.

Angular13, rxjs7.4

2

There are 2 best solutions below

0
Phong Cao On

If BS, CS, AS don't depend on each other, they can run parallel, all you need is make sure all of this Observables complete to run restructureABCs(). So you can use combineLatest/forkJoin for this case.

combineLatest([
   this.service.serviceA().pipe(catchError(er => ...)),
   this.service.serviceB().pipe(catchError(er => ...)),
   this.service.serviceC().pipe(catchError(er => ...))
]).subscribe(response => {
      this.as = response[0];
      this.bs = response[1];
      this.cs = response[2];

      restructureABCs()
})
0
krul On

Here is the solution I come up with using forkJoin.

as : Array<as>;
bs : Array<bs>;
cs : Array<cs>;
as$: Observable<as[]> = this.service.serviceA();
bs$: Observable<bs[]> = this.service.serviceB();
cs$: Observable<cs[]> = this.service.serviceC();

init() {


forkJoin([
      this.as$.pipe(catchError(er => of({errMsg: 'Failed to load as! Error: ', error: er}))),
      this.bs$.pipe(catchError(er => of({errMsg: 'Failed to load bs! Error: ', error: er}))),
      this.cs$.pipe(catchError(er => of({errMsg: 'Failed to load cs! Error: ', error: er})))
    ])
      .subscribe({
        next: ([a, b, c]) => {
          this.as = a as Array<as>;
          this.bs = b as Array<bs>;
          this.cs = c as Array<cs>;
          this.restructureABCs();
        },
        error: (e) => this.notification.showError(e.errMsg)
      });
}

Found helpful article about handling errors in forkjoin: