Wait for all observables in mergeMap to complete before emitting a custom value

3.2k Views Asked by At

I want to process a list of observables concurrently using flatMap and then to emit a single empty value when all inner observables are processed. Is there an elegant way to achieve this, i.e. using a single operator?

Here's the example:

const { of, from } = Rx.Observable;

from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  .mergeMap(number => multiply(number), 2) // processing two numbers at a time
  .last() // waiting for all inner observables to complete
  .map(_ => undefined) // casting a value returned by last() to an empty value
  .subscribe()
;

function multiply(number) {
  return of(number * 2) // multiplying the number
    .delay(200) // adding a slight delay
  ;
}

I know I can use toArray() or last() to wait for all inner observables to complete, but then I will need to cast it to an empty value using map() operator (as in my example above).

I guess, I'm looking for an operator with the following semantics: emit X when source observable completes, e.g.:

from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  .mergeMap(number => multiply(number), 2)
  .emitOnComplete(undefined)
  .subscribe(console.log) // we should get undefined here
;
2

There are 2 best solutions below

0
Slava Fomin II On

The most convenient way that I've managed to find so far is combining ignoreElements operator with endWith:

from([1, 2, 3, 4, 5])
  .mergeMap(number => multiply(number), 2)
  .ignoreElements()
  .endWith(undefined)
  .subscribe(console.log) // getting undefined here
;

However, it would be really nice to have a single operator for this.

2
martin On

There's actually one sneaky solution using reduce() that emits only once when its source observable completes. You can use it to ignore all values and just return the seed value:

from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  .pipe(
    mergeMap(number => multiply(number), 2),
    reduce((acc, value) => acc, undefined),
  )
  .subscribe(console.log);

Live demo: https://stackblitz.com/edit/rxjs-tx6bbe

Btw, funny fact: The same trick with reduce() is used inside Angular's Router package (just without the seed value).