Let's say we have a source$ observable, which is usually user interaction. And we want to perform an async operation on each firing of source$ and "map" the async result to an output observable, say result$.
mergeMap
The naivest implementation is
result$ = source$.pipe(
mergeMap((s) => someAsyncOperation(s))
)
However, it is possible for a previous response to override most recent response because someAsyncOperation may spend different amount of time for each round.
source: -----1-----2------->
result1: -----------|
result2: --|
result: -------------2--1-->
The last value on result$ observable is 1, which is incorrect as we have already triggered the operation for 2 and the response 2 has already arrived.
switchMap
We can replace mergeMap with switchMap and the graph would be:
source: -----1-----2------->
result1: -----------|
result2: --|
result: -------------2----->
For typical use cases like search suggestion, switchMap is desirable since the response-1 is most likely to be valueless once action-2 is fired.
Problem
But for some other cases, responses for previous actions may still be valid. For example, for a periodic polling scenario, responses are valuable as long as their chronical order is perserved.
source: -----1-----2----3------->
result1: --------|
result2: -----------|
result3: ----|
mergeMap: -----------1------3-2->
switchMap:------------------3--->
expected: -----------1------3--->
It's obvious that response-1 and response-3 are both desirable as they arrive in chronical order (while response-2 is invalid because it arrives after response-3).
The problem with mergeMap is that it cannot omit the invalid response-2.
While switchMap is also suboptimal because it omits a desirable value response-1 as the second observable has already started when response-1 arrives. The problem of switchMap worsens when the average RTT is larger than the polling interval.
source: -----1----2----3----4----5----->
result1: --------|
result2: --------|
result3: --|
result4: -------|
result5: ----|
switchMap:---------------3------------->
expected: -----------1---3---------4-5->
You can see switchMap generates far less outputs than the ideal one
Question
How should I get the expected output observable in this case?
You can attach an "emission index" to each response and use that to filter out older emissions.
Here we can use
scanto keep track of the highest emitted index thus far, then usefilterto prevent emissions from older requests.Here's a working StackBlitz demo.