How to repeat RxJS from() but skip first emission

74 Views Asked by At

I have the following code which I expect should emit for each member in the scheduleEntries array, and each scheduleEntry should repeat forever, and I'm trying to get the first emission of each array member to be skipped. How can I get it to work?

Right now it doesn't emit at all with skip(1)

import { Observable, from, mergeMap, of, repeat, skip, timer } from 'rxjs';

class Schedule {
  private scheduleEntries = [
    { value: true, countdown: 48395 },
    { value: true, countdown: 38395 },
    { value: false, countdown: 42394 },
    { value: true, countdown: 4835 },
  ];

  private lockEmitter$: Observable<ScheduleEntry>;

  constructor() {
    this.lockEmitter$ = from(this.scheduleEntries).pipe(
      skip(1), // skip the first emission for each entry
      mergeMap(entry => of(entry).pipe(
        repeat({ delay: () => this.delayedObservableFactory(entry) }),
      )),
    );
    this.lockEmitter$.subscribe(console.log);
  }

  private delayedObservableFactory(entry: ScheduleEntry) {
    return new Observable(observer => {
      timer(entry.countdown).subscribe(() => observer.next({}));
    });
  }
}


interface ScheduleEntry {
  value: boolean;
  countdown: number;
}
4

There are 4 best solutions below

2
danielRICADO On

Think you need to move skip before pipe?

Actually looking again skip is applying the whole array but you may need to skip each entry as your code comment suggests (missed that detail first time round)

  mergeMap(entry => of(entry).pipe(
    delay(entry.countdown),
    skip(1),
    repeat()
  )),
0
Naren Murali On

When you use from, we require concatMap to ensure the items are executed in order, please find below stackblitz

import './style.css';

import { rx, map } from 'rxjs';
import {
  Observable,
  from,
  mergeMap,
  of,
  repeat,
  skip,
  timer,
  delay,
} from 'rxjs';
import { concatMap } from 'rxjs/operators';

class Schedule {
  private scheduleEntries = [
    { value: true, countdown: 48395 },
    { value: true, countdown: 1000 },
    { value: false, countdown: 1000 },
    { value: true, countdown: 1000 },
  ];

  private lockEmitter$: Observable<ScheduleEntry>;

  constructor() {
    this.lockEmitter$ = from(this.scheduleEntries).pipe(
      skip(1),
      concatMap((item: ScheduleEntry) => of(item).pipe(delay(item.countdown))),
      repeat()
    );
    this.lockEmitter$.subscribe(console.log);
  }
}

new Schedule();

interface ScheduleEntry {
  value: boolean;
  countdown: number;
}

stackblitz

0
maxime1992 On

Reading what you're asking for, you mention you want to skip the first emission, but looking at your code, I think all you want is to actually delay all the emissions (including the first one).

Here's how it can be done:

from(scheduleEntries)
  .pipe(
    mergeMap((entry) => 
      of(entry).pipe(
        delay(entry.countdown),
        repeat()
      )
    )
  )
  .subscribe(console.log);

It's not so far from what you had, but there's a subtle difference in using a delay outside of the repeat, it means that even the first emission will be delayed (and then repeated indefinitely).

Here's a live demo on Stackblitz.

0
akotech On

You could simplify your code using the interval operator like this:

this.lockEmitter$ = from(this.scheduleEntries).pipe(
  mergeMap(entry => interval(entry.countdown).pipe(
    map(() => entry) // or use entry.value if you only interested in the boolean
  )),
);