import { of, Subject, interval } from 'rxjs';
import {
tap,
startWith,
map,
delay,
publishReplay,
publish,
refCount,
withLatestFrom,
switchMap,
take
} from 'rxjs/operators';
const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publishReplay(),
refCount()
);
const c$ = b$.pipe(withLatestFrom(a$));
b$.subscribe(b => console.log(`b$`));
c$.subscribe(c => console.log(`c$`)); // my problem, why c$ not emit value
of(0).subscribe(a$);
I don't know why 'c$' is not printed here is my online code, https://stackblitz.com/edit/rxjs-pc5y8d?devtoolsheight=60&file=index.ts
TLDR;
Working StackBlitz app.
Explained solution
I will start with an interesting observation: if you comment the line with where
b$
is subscribed(b$.subscribe(observer('b$'));
), thenext
callback of thec$
observer will be executed:Another observation is that the same
next
callback will be invoked also if you change the order of the subscribers: first you subscribe toc$
, then tob$
:We will first understand the why behind these observations and then we will come up with a solution.
Firstly, it's important to know that a
Subject
keeps track of its subscribers by using a subscribers list. When the subject emits a value(subject.next()
), all of the Subject's subscribers will receive that value, based on the order in which they have subscribed to the Subject.In the first example:
when
c$
is subscribed, it will first subscribe to each argument passed towithLatestFrom
, then tob$
. But sinceb$
is based ona$
, it means that thea$
Subject will end up having 2 subscribers.In order for
withLatestFrom
to emit a value, 2 conditions must be fulfilled at the same time:b$
) must emit a valuewithLatestFrom
's arguments(which are observables) have emitted at least onceIn this case, since the only
withLatestFrom
's argument is subscribed first toa$
, all of the above conditions will be fulfilled and this is whyobserver c$: 0,0
will be printed to the console.In the second example:
until
b$.subscribe(observer('b$'));
, the same thing that I described above happens. Whatb$.subscribe(observer('b$'));
does it does is to add another subscriber to a Subject instance, but this time it won't bea$
, it will be subject that belongs topublish()
. When the first subscription takes place, thepublish
will create a Subject instance and will add that new subscriber to it, but it will also subscribe to the source(this happens internally). Once again, this happens only on the first subscription. On subsequent subscriptions, the subscribers will be added to the Subject maintained bypublish
.The
a$
Subject will have 2 subscribers too. One fromwithLatestFrom
and one from theb$
's first subscription.So, in this case the console will output:
observer c$: 0,0
and thenobserver b$: 0
.The initial problem was that
observer c$: 0,0
won't be in the console output.The reason it happens is because
b$
is subscribed first, which means thata$
will have its first subscriber(here is where it happens). Also thepublish
's Subject will be(1)
. Next, whenc$
is subscribed,(2)
will become the second subscriber of thepublish
's Subject. It's important to notice thata$
won't be subscribed again. However,a$
will get its second subscriber, which is(3)
. When thea$
emits, the first subscriber to receive the value will be the ones caused byb$
, to that second condition ofwithLatestFrom
won't be fulfilled, since its source has emitted, but none of thewithLatestFrom
's observables have emitted anything yet.A solution would be this:
By using
delay(0),
, we ensure that no matter the order of the subscribers,withLatestFrom
's observable will be the first to receive the value. This was needed because, in this situation,b$
is of the forma$.pipe(...)
andwithLatestFrom
's argument isa$
. Withdelay(0)
, thewithLatestFrom
's observable will always receive values first.As a side note,
observeOn(asyncScheduler)
could also be used, instead ofdelay(0)
. In both cases, the output is: