Make a ReplaySubject return only the last value on subscribe

2.5k Views Asked by At

I have a strange use case where I need to keep track of all previous emitted events.

Thanks to the ReplaySubject, it works perfectly so far. On every new subscriber, this Subject re-emits every previous events.

Now, for a specific scenario, I need to be able to give only the latest published events (a bit like a BehaviorSubject), but keeping the source the same events.

Here is a snippet of what I'm trying to achieve: stackblitz

import { ReplaySubject, BehaviorSubject, from } from "rxjs";

class EventManager {
  constructor() {
    this.mySubject = new ReplaySubject();
  }

  publish(value) {
    this.mySubject.next(value);
  }

  fullSubscribe(next, error, complete) {
    return this.mySubject.subscribe(next, error, complete);
  }

  subscribe(next, error, complete) {
    return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, complete);
  }
}

const myEventManager = new EventManager();

myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");

myEventManager.fullSubscribe(v => {
  console.log("SUB 1", v);
});

myEventManager.subscribe(v => {
  console.log("SUB 2", v);
});

Thank you

3

There are 3 best solutions below

0
On BEST ANSWER

If you keep track of the number of events you've published, you could use skip:

  subscribe(next, error?, complete?) {
    return this.mySubject.pipe(
      skip(this.publishCount - 1)
    ).subscribe(next, error, complete);
  }

Here's a StackBlitz demo.

2
On

Why not just have an instance of ReplaySubject and BehaviorSubject on EventManager?

import { ReplaySubject, BehaviorSubject, from } from "rxjs";

class EventManager {
  constructor() {
    this.replaySubject = new ReplaySubject();
    this.behaviorSubject = new BehaviorSubject();
  }

  publish(value) {
    this.replaySubject.next(value);
    this.behaviorSubject.next(value);
  }

  fullSubscribe(next, error, complete) {
    return this.replaySubject.subscribe(next, error, complete);
  }

  subscribe(next, error, complete) {
    return this.behaviorSubject.subscribe(next, error, complete);
  }
}
0
On

Instead of forcing a ReplaySubject to behave like a BehaviorSubject, you can arrive at ReplaySubject-like behavior by manipulating a BehaviorSubject.

import { BehaviorSubject, from, concat } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';

class EventManager {
  constructor() {
    this.mySubject = new BehaviorSubject();
    this.allEmittedValues = this.mySubject.pipe(
      scan((xs, x) => [...xs, x], []),
      shareReplay(1)
    );

    // Necessary since we need to start accumulating allEmittedValues
    // immediately.
    this.allEmittedValues.subscribe();
  }

  dispose() {
    // ends all subscriptions
    this.mySubject.complete();
  }

  publish(value) {
    this.mySubject.next(value);
  }

  fullSubscribe(next, error, complete) {
    // First, take the latest value of the accumulated array of emits and
    // unroll it into an observable
    const existingEmits$ = this.allEmittedValues.pipe(
      take(1),
      concatMap((emits) => from(emits))
    );
    // Then, subscribe to the main subject, skipping the replayed value since
    // we just got it at the tail end of existingEmits$
    const futureEmits$ = this.mySubject.pipe(skip(1));

    return concat(existingEmits$, futureEmits$).subscribe(
      next,
      error,
      complete
    );
  }

  subscribe(next, error, complete) {
    return this.mySubject.subscribe(next, error, complete);
  }
}