Multiple sequential Observables depend on a single Observable

57 Views Asked by At

I am curious about a better way to write the following scenario.

Assume I have a userObservable fetches user data (user) from db then I'd like to

  • Validate some input with user, validateObservable
  • After get some external data with user, getExternalObservable
  • After combine externalData with user and update db again, updateDbObservable
  • In the end I'd like to get this updated user data.

Normally I can do this as below, but this code looks repetitive and produces a little bit ugly map -> forkJoin -> mergeAll boilerplate.

userObservable.pipe(
  map((user) => {
    return forkJoin({
      user: of(user),
      verifyResult: verifyObservable(user)
    });
  },

  mergeAll(),

  map(({ user }) => {
    return forkJoin({
      user: of(user),
      externalData : getExternalObservable(user)
    });
  }),
  
  mergeAll(),

  map(({user, externalData}) => {
    return updateDbObservable(user, externalData);
  }),

  mergeAll(),
);
    

Thank you !

2

There are 2 best solutions below

2
BizzyBob On

You can greatly simplify your code by doing the following:

  • define a verifiedUser observable that only emits when verification passes
  • use nested pipes so the most inner call has access to emissions from other observables
const verifiedUser = userObservable.pipe(
  switchMap(user => verifyObservable(user).pipe(
    catchError(() => EMPTY),
    map(() => user)
  ))
);

Here we declare an observable that will emit the user only when verification passes.

const updateDbResult = verifiedUser.pipe(
  switchMap(user => getExternalObservable(user).pipe(
    switchMap(externalData => updateDbObservable(user, externalData))
  ))
);

Here we add a .pipe() to each observable inside the switchMap:

When verifiedUser emits, the results flow to getExternalObservable(), then finally the updateDbObservable().

Notice that the most inner call has access to all prior emitted values so there's no need to fabricate an object using forkJoin / of.


Separate Observables

In the above code we've defined separate smaller observables that can make it easier to follow the logic. If we need to alter what it means to be a "verified user", we don't need to change our updateDbResult observable at all!

3
Vivick On
userObservable.pipe(
  switchMap(user =>
    of(user).pipe(
      tap(verifyObservable),
      catchError(() => EMPTY), // means "skip if an error occurs"
      // In our case, that means if it wasn't verified/validated
    )
  )
  switchMap(user => 
    getExternalObservable(user).pipe(
      map(externalData => ({ user, externalData })),
    )
  ),
  switchMap(({ user, externalData }) => {
    // I'll assume `updateDbObservable` emits the updated user value as well
    // otherwise, simply pipe a `map(() => user)` to the result of `updateDbObservable`
    return updateDbObservable(user, externalData);
  }),
  share(), // Avoids duplicating DB queries if you have multiple subscriptions to this observable
);

Unlike Promises, we don't have a nice alternative syntax like async/await. So we're forced to use the "continuation tricks" we did with promises to drag the previous result along. In the case of Promises, .then is both map and flatMap. Here, map is map and switchMap (or mergeMap if you don't want the switching optimization) is flatMap.