Deep diving into Rx.ReplaySubject: How to delay `next()`?

159 Views Asked by At

Ignoring Block: Either I'm wrong here, or the whiskey is starting to work. (I don't want to rule out that I'm going stupid either. Sorry for that.)

Expectation:

I would have expected ReplaySubject to return a single value every 2 seconds because I wait two seconds (each time) before I call next().

Result:

The result is that there is a wait of 2 seconds, but then all values are output simultaneously.

The code in question is here:

import { ReplaySubject } from 'rxjs';

export const rs$: ReplaySubject<number> = new ReplaySubject();

rs$.subscribe({
  next: (data) => console.log(data),
  error: (error) => console.warn(error),
  complete: () => console.log('ReplaySubject completed'),
});

const fakeAPIValuesOne: Array<number> = [7, 11, 13];

fakeAPIValuesOne.forEach(async (entry: number) => {
  await wait(2000);  // <--- Why does wait() not work?
  rs$.next(entry);
});

function wait(milliseconds: number) {
  return new Promise((resolve) => setTimeout(resolve, milliseconds));
}

Question:

What am I doing fundamentally wrong here?

If you want to try it out: https://stackblitz.com/edit/rxjs-wpnqvq?file=index.ts

EDIT 1:

SetTimeout also has no effect. The following code does exactly the same as above:

fakeAPIValuesOne.forEach((value: number) => {
  setTimeout(() => {
    rs$.next(value);
  }, 2000);
});

I wonder how next() can override all the delays here?

EDIT 2

The problem is solved, correct answer marked, thank you! You need the following details to run root level awaits for your ts-files.

package.json

Please notice the type section:

{
  "name": "playground",
  "version": "1.0.0",
  "description": "",
  "main": "index.ts",
  "scripts": {
    "start": "nodemon index.ts",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "MIT",
  "dependencies": {
    "rxjs": "^7.5.5",
    "ts-node": "^10.7.0",
    "typescript": "^4.8.0-dev.20220507"
  },
  "type": "module"
}

nodemon.json

Please consider the following config to avoid the error: TypeError [ERR_UNKNOWN_FILE_EXTENSION]: Unknown file extension ".ts"

{
  "execMap": {
    "ts": "node --loader ts-node/esm"
  }
}

Last but not least tsconfig.json

{
  "compilerOptions": {
    "module": "ESNext",
    "target": "ESNext",
    "moduleResolution": "node",
    "esModuleInterop": true,
    "allowSyntheticDefaultImports": true,
    "isolatedModules": true,
    "noEmit": true,
    "strict": true,
    "lib": ["esnext", "DOM"]
  }
}
2

There are 2 best solutions below

0
On BEST ANSWER

Following note is extracted from mozilla web docs here

Note: forEach expects a synchronous function.

forEach does not wait for promises. Make sure you are aware of the implications while using promises (or async functions) as forEach callback.

So you this issue isn't related to the ReplaySubject, you just cannot use forEach for this use-case.

Cheers

EDIT: Solved

import { ReplaySubject } from "rxjs";

export const rs$ = new ReplaySubject();

rs$.subscribe({
  next: (data) => console.log(data),
  error: (error) => console.warn(error),
  complete: () => console.log("ReplaySubject completed"),
});

const fakeAPIValuesOne = [7, 11, 13];

// That won't work:
// fakeAPIValuesOne.forEach(async (entry: number) => {
//   await wait(2000);
//   rs$.next(entry);
// });


// That will work
for (const element of fakeAPIValuesOne) {
  await wait(2000);
  rs$.next(element);
}

function wait(milliseconds: number) {
  return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
1
On

I think it's worth mentioning that you shouldn't (in general) expect this to work.

Consider the following:

wait(2000);
wait(2000);
wait(2000);
console.log("Hello");

"Hello" will be printed to the console right away. It's not going to wait 6 seconds.

That doesn't change if you put them in a loop.

for (const n of [1,2,3]) {
  wait(2000);
}
console.log("Hello");

This will also print "Hello" right away.


If you don't use .then() or await, the standard behavior is that you will not wait. Some APIs will (of course) perform an await on your behalf, but the expected behavior is that it won't.

You need to write it like this:

await wait(2000);
await wait(2000);
await wait(2000);
console.log("Hello");

Now you will wait 6 seconds before printing "Hello" to the console.


Now look at this:

const createPromise = async (v) => {
   await wait(2000);
   console.log("Hello From createPromise: ", v);
}

createPromise(1);
createPromise(2);
createPromise(3);
console.log("Hello");

You'll notice this isn't really any different. This will print Hello right away, and then 2 seconds later all the promises will resolve and print

Hello
// 2 seconds wait
Hello From createPromise: 1
Hello From createPromise: 2
Hello From createPromise: 3

You'll notice that none of them wait for each other. That's the expected behavior of a promise. You need to use await or .then in order to actally wait for the result of a promise.

You need to write:

await createPromise(1);
await createPromise(2);
await createPromise(3);
console.log("Hello");

These two loops do the same thing (as expected, they don't wait)

for (const n of [1,2,3]) {
  createPromise(n);
}

[1,2,3].forEach(n => createPromise(n))

console.log("Hello");

Again, the first thing you'll see printed is "Hello" because you never wait for any of these promises to resolve.

You do await the inner wait(2000) promise, but you never await the outer createPromise() promises. So of course you won't wait for them :)

for (const n of [1,2,3]) {
  await createPromise(n);
}