Before coming here I have read the official documentation of Rxjs and some other pages but I am still not clear. What I understood is this:
It is used to "join" 2 observables and thus obtain a single observable as a result, I also saw that it is used to "flatten" an observable (I am also not very clear).
Now ... I have days trying to program a user registry using Angular and Node.js with Express and I found a little tutorial which I decided to use and it has this code:
import { Injectable, Injector } from '@angular/core';
import { HttpClient, HttpInterceptor, HttpRequest, HttpHandler, HttpEvent, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError } from 'rxjs';
import { catchError, retry, mergeMap } from 'rxjs/operators'
import { AuthenticationService } from './authentication.service';
@Injectable({
providedIn: 'root'
})
export class AppInterceptor implements HttpInterceptor {
constructor(private injector: Injector) { }
intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
let accessToken = "", refreshToken = ""
const tokens = JSON.parse(sessionStorage.getItem("tokens"))
if (tokens) {
accessToken = tokens.accessToken
refreshToken = tokens.refreshToken
}
let clonHttp: HttpRequest<any>
clonHttp = tokens ? req.clone({ headers: req.headers.append("Authorization", `Bearer ${accessToken}`) }) : req
let auth = this.injector.get(AuthenticationService);
return next.handle(clonHttp)
.pipe(
catchError((error: HttpErrorResponse) => {
if (error.error instanceof ErrorEvent) {
console.log("error event")
} else if (error.status == 401) {
return auth.getNewAccessToken(refreshToken)
.pipe(
retry(3),
mergeMap(
(response: any) => {
tokens.accessToken = response.accessToken
sessionStorage.setItem("tokens", JSON.stringify(tokens))
clonHttp = req.clone({ headers: req.headers.append("Authorization", `Bearer ${response.accessToken}`) })
return next.handle(clonHttp)
}
)
)
} else if (error.status == 409) {
return throwError("User not logged")
} else {
if (error.error && error.error.message) {
return throwError(error.error.message)
} else {
return throwError("Check your connection")
}
}
})
)
}
}
If you see, when you use the MergeMap operator they only pass you the answer (a single observable), or at least that's what I can see. What I'm trying to say is that I don't see that they are using it with 2 observables or to mix 2 observables, which is what I have read in their official documentation, in fact, in the examples they show they always use it with 2 observables.
Honestly it has been too difficult for me to understand this operator, if someone could help me understand it in a simple way, I would be extremely grateful, in addition to understanding its use in that code that I show earlier. Greetings in advance. Thank you!
mergeMap, like many other so-called higher order mapping operators, maintains one or multiple inner observables.An inner observable is created with the outer value and the provided function. The outer value essentially is just the value received from its source. For example:
When an outer value comes in, a new inner observable will be created. I think the best way to understand this is to have a look at the source code:
Please disregard for now
concurrentandbuffer, we'll have a look at them a bit later.Now, what happens when an inner observable emits ? Before going any further, it's worth mentioning that, although it's obvious, an inner observable requires an inner subscriber. We can see this in the
_innerSubmethod from above:When an inner observable emits, the
notifyNextmethod will be called:Where destination points to the next subscriber in the chain. For example, it can be this:
This will be explained in more detail in What about the next subscriber in the chain below.
So, what does it mean to
to mix 2 observables?Let's see this example:
When
2arrives,mergeMapwill subscribe to an inner observable that will emit in200ms. This is an asynchronous action, but notice that the outer values(2, 3, 1) arrive synchronously. Next,3arrives and will create an inner obs. that will emit in300ms. Since the current script has not finished executing yet, the callback queue is not yet considered. Now1arrives, and will create an inner obs. that will emit in100ms.mergeMaphas now 3 inner observables and will pass along the inner value of whichever inner observable emits.As expected, we get
1,2,3.So that's what
mergeMapdoes. Mixing observables can be thought of this way: if an outer value comes and an inner observable has already been created, thenmergeMapsimply says: "no problem, I'll just create a new inner obs. and subscribe to it".What about
concurrentandbuffermergeMapcan be given a second argument,concurrentwhich indicates how many inner observables should handle at the same time. These number of active inner observables is tracked with theactiveproperty.As seen in
_nextmethod, ifactive >= concurrent, theouterValueswill be added to abuffer, which is a queue(FIFO).Then, when one active inner observable completes,
mergeMapwill take the oldest value from the value and will create an inner observable out of it, using the provided function:With this in mind,
concatMap(project)is justmergeMap(project, 1).So, if you have:
this will be logged:
2 \n 3 \n 1.What about the next subscriber in the chain
Operators are functions that return another function which accepts an observable as their only parameter and return another observable. When a stream is being subscribed to, each observable returned by an operator will have its own subscriber.
All these subscribers can be seen as a linked list. For example:
S{n}is the parent(destination) ofS{n+1}, meaning thatS{1}is the destination ofS{2},S{2}is the destination ofS{3}and so forth.StackBlitz
Unexpected results
Compare these:
As per MDN:
This section by MDN should clarify things as well.
I'd say this is environment-specific, rather than RxJs-specific.
In the second snippet, the delays are consecutive so that's why you're getting unexpected results. If you increase the delays just a bit, like:
timer(v * 2), you should get the expected behavior.