I would like to split a single RxJs stream into two streams and perform two different async tasks and finally merge those 2 streams back into one. The best solution I have so far is:
import { fromEvent, debounceTime, merge } from 'rxjs';
// SOURCE stream
const clicks = fromEvent(document, 'click').pipe(share());
const stream$ = merge(
clicks.pipe( // stream 1
debounceTime(1000),
map((x) => 'debounceA')
),
clicks.pipe( // stream 2
debounceTime(2000),
map((x) => 'debounceB')
)
);
stream$.subscribe((x) => console.log(x));
// OUTPUT:
// debounceA (after 1 second)
// debounceB (after 2 seconds)
Now, what I'm wondering about, is this the best solution as I have now clicks.pipe
twice in my code. I've added share
to make sure that code (not in this example) runs twice. But I was thinking, is it possible with RxJs to somehow swap the clicks.pipe
and the merge
, something like this:
...
const stream$ = clicks.pipe(
merge(
debounceTime(1000).pipe(map((x) => 'debounceA')),
debounceTime(2000).pipe(map((x) => 'debounceB'))
)
);
...
Although this doesn't work it somehow makes more sense I think. Is something like this possible or are there better RxJs ways to do what I showed here?
If I understand the problem right, what you do not like is the duplication of the logic within
clicks.pipe(...)
block.If this is the case, I would use an higher level function like this
DEMO