rxjs withLatestFrom unExpected behavior

751 Views Asked by At
import { of, Subject, interval } from 'rxjs';
import {
  tap,
  startWith,
  map,
  delay,
  publishReplay,
  publish,
  refCount,
  withLatestFrom,
  switchMap,
  take
} from 'rxjs/operators';

const a$ = new Subject();

const b$ = a$.pipe(
  map(a => {
    console.log('expensive calculation');
    return a;
  }),
  publishReplay(),
  refCount()
);

const c$ = b$.pipe(withLatestFrom(a$));

b$.subscribe(b => console.log(`b$`));
c$.subscribe(c => console.log(`c$`)); // my problem, why c$ not emit value

of(0).subscribe(a$);

I don't know why 'c$' is not printed here is my online code, https://stackblitz.com/edit/rxjs-pc5y8d?devtoolsheight=60&file=index.ts

3

There are 3 best solutions below

1
On

TLDR;

Working StackBlitz app.


Explained solution

I will start with an interesting observation: if you comment the line with where b$ is subscribed(b$.subscribe(observer('b$'));), the next callback of the c$ observer will be executed:

const a$ = new Subject();

const b$ = a$.pipe(
  map(a => {
    console.log('expensive calculation');
    return a;
  }),
  publish(),
  refCount()
);

function observer(name: string) {
  return {
    next: (value: number) => {
      console.log(`observer ${name}: ${value}`);
    },
    complete: () => {
      console.log(`observer ${name}: complete`);
    }
  };
}

const c$ = b$.pipe(withLatestFrom(a$));
// b$.subscribe(observer('b$'));

c$.subscribe(observer('c$'));

of(0).subscribe(a$);

/*
Console output:

expensive calculation
observer c$: 0,0
observer c$: complete
 */

Another observation is that the same next callback will be invoked also if you change the order of the subscribers: first you subscribe to c$, then to b$:

const a$ = new Subject();

const b$ = a$.pipe(
  map(a => {
    console.log('expensive calculation');
    return a;
  }),
  publish(),
  refCount()
);

function observer(name: string) {
  return {
    next: (value: number) => {
      console.log(`observer ${name}: ${value}`);
    },
    complete: () => {
      console.log(`observer ${name}: complete`);
    }
  };
}

const c$ = b$.pipe(
  withLatestFrom(a$)
);

c$.subscribe(observer('c$'));
b$.subscribe(observer('b$'));

of(0).subscribe(a$);

/*
Console output:

expensive calculation
observer c$: 0,0
observer b$: 0
observer c$: complete
observer b$: complete
*/

We will first understand the why behind these observations and then we will come up with a solution.

Firstly, it's important to know that a Subject keeps track of its subscribers by using a subscribers list. When the subject emits a value(subject.next()), all of the Subject's subscribers will receive that value, based on the order in which they have subscribed to the Subject.

In the first example:

const a$ = new Subject();
const b$ = a$.pipe(/* ... */);
const c$ = b$/* subscriber 2 */.pipe(withLatestFrom(a$/* subscriber 1 */));
// b$.subscribe(observer('b$'));

c$.subscribe(observer('c$'));

when c$ is subscribed, it will first subscribe to each argument passed to withLatestFrom, then to b$. But since b$ is based on a$, it means that the a$ Subject will end up having 2 subscribers.
In order for withLatestFrom to emit a value, 2 conditions must be fulfilled at the same time:

  • its source(in this case, b$) must emit a value
  • all of the withLatestFrom's arguments(which are observables) have emitted at least once

In this case, since the only withLatestFrom's argument is subscribed first to a$, all of the above conditions will be fulfilled and this is why observer c$: 0,0 will be printed to the console.

In the second example:

const a$ = new Subject();
const b$ = a$.pipe(/* ... */);

const c$ = b$.pipe(
  withLatestFrom(a$)
);

c$.subscribe(observer('c$')); /* b$'s first subscription */
b$.subscribe(observer('b$')); /* b$'s second subscription */

of(0).subscribe(a$);

until b$.subscribe(observer('b$'));, the same thing that I described above happens. What b$.subscribe(observer('b$')); does it does is to add another subscriber to a Subject instance, but this time it won't be a$, it will be subject that belongs to publish(). When the first subscription takes place, the publish will create a Subject instance and will add that new subscriber to it, but it will also subscribe to the source(this happens internally). Once again, this happens only on the first subscription. On subsequent subscriptions, the subscribers will be added to the Subject maintained by publish.
The a$ Subject will have 2 subscribers too. One from withLatestFrom and one from the b$'s first subscription.

So, in this case the console will output: observer c$: 0,0 and then observer b$: 0.

The initial problem was that observer c$: 0,0 won't be in the console output.

const c$ = b$.pipe(withLatestFrom(a$) /* (3) */);
b$.subscribe(observer('b$') /* (1) */);

c$.subscribe(observer('c$') /* (2) */);

The reason it happens is because b$ is subscribed first, which means that a$ will have its first subscriber(here is where it happens). Also the publish's Subject will be (1). Next, when c$ is subscribed, (2) will become the second subscriber of the publish's Subject. It's important to notice that a$ won't be subscribed again. However, a$ will get its second subscriber, which is (3). When the a$ emits, the first subscriber to receive the value will be the ones caused by b$, to that second condition of withLatestFrom won't be fulfilled, since its source has emitted, but none of the withLatestFrom's observables have emitted anything yet.

A solution would be this:

/* ... */
const c$ = b$.pipe(
  delay(0),
  withLatestFrom(a$)
);
b$.subscribe(observer('b$'));

c$.subscribe(observer('c$'));

of(0).subscribe(a$);

By using delay(0),, we ensure that no matter the order of the subscribers, withLatestFrom's observable will be the first to receive the value. This was needed because, in this situation, b$ is of the form a$.pipe(...) and withLatestFrom's argument is a$. With delay(0), the withLatestFrom's observable will always receive values first.

As a side note, observeOn(asyncScheduler) could also be used, instead of delay(0). In both cases, the output is:

expensive calculation
observer b$: 0
observer b$: complete
observer c$: 0,0
observer c$: complete
1
On

withLatestFrom must emit a value first

so If you use BehaviorSubject you can do the following:

const _a = new BehaviorSubject<string>('Hello');
const a$ = _a.asObservable();

a$.subscribe();

and it should work.

1
On

Thanks for your answer!I still have some questions:

const c$ = b$.pipe(withLatestFrom(a$) /* (3) */);
b$.subscribe(observer('b$') /* (1) */);
c$.subscribe(observer('c$') /* (2) */);

in this example, can i explain like this?

when b$ is subscribe, it add a first subscriber in b$'s subscriber array

when c$ is subscribe, it first add a subscriber of a$, and add a second subscriber in b$'s subscriber array

when a$ emit an value, it first notify b$'s subscriber array, then notify withLatestFrom observable

when b$'s second subscriber is notified, withLatestFrom observable have not emit value, so it not print anything

@Andrei Gătej