Create a Observable that delays the next value

1.4k Views Asked by At

I'm trying to create an observable using RxJS that does what is pictured.

Expected observable mapping

  • Grabs a value and waits a fixed period of time before getting the next one.
  • The next one will be the last value emitted in the period of the wait, skipping the rest.
  • If an wait interval goes by where no value was emitted, the next one should be grabbed immediately as the last example of the image depicts.
2

There are 2 best solutions below

2
On BEST ANSWER

This should do the trick.

var Rx      = require('rx'),
    source  = Rx.Observable.interval(10).take(100),
    log     = console.log.bind(console);

Rx.Observable.create(function (observer) {

    var delaying = false,
        hasValue = false,
        complete = false,
        value;

    function onNext (x) {
      value = x;
      if (delaying) {
        hasValue = true;
      } else {
        sendValue();
      }
    }

    function sendValue () {
      observer.onNext(value);
      if (complete) {
        observer.onCompleted();
      } else {
        setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
      }
      delaying = true;
    }

    function callback () {
      if (hasValue) {
        hasValue = false;
        sendValue();
      } else {
        delaying = false;
      }
    }

    return source.subscribe(
        onNext,
        observer.onError.bind(observer),
        function () {
          if (hasValue) {
            complete = true;
          } else {
            observer.onCompleted();
          }
        }
      );
  })
  .subscribe(log);
2
On

Here is Christopher's solution modified into a operator.

The throttleImmediate operator only stores the latest value from the source until the given selector completes. It fires the cached value, if existent, right after each completion. It is best suited to use when the selector has side effects (e.g. an animation).

var Rx  = require('rx'),
source  = Rx.Observable.interval(10).take(500),
log     = console.log.bind(console);

Rx.Observable.prototype.throttleImmediate = function (selector) {
    var source = this;

    return Rx.Observable.create(function (observer) {

        var delaying = false,
            hasValue = false,
            complete = false,
            value;

        function onNext (x) {
          value = x;
          if (delaying) {
            hasValue = true;
          } else {
            sendValue();
          }
        }

        function sendValue () {
          delaying = true;
          selector(value).subscribe(
            observer.onNext.bind(observer),
            observer.onError.bind(observer),
            function(){
              if (hasValue) {
                hasValue = false;
                sendValue();
              } else {
                delaying = false;
              }
            }
          );
        }

        return source.subscribe(
            onNext,
            observer.onError.bind(observer),
            function () {
              if (hasValue) {
                complete = true;
              } else {
                observer.onCompleted();
              }
            }
          );
      });
};

source
  .throttleImmediate(function(data){
    var delay;

    if(data%2==0)
      delay=500;
    else
      delay=1000;

    return Rx.Observable.timer(delay).map(function(){ return data; });
  })
  .subscribe(log)

This comes in handy while back pressuring sources where the value to delay is only known by the selector.

Example: Given the question's marble diagram.

Let's suppose the first source are ajax calls with html data to display, ajaxPages that originated from clicks on a navbar. And we want render them along with an entry animation, animatePage, whose duration is dynamic.

ajaxPages.throttleImmediate(animatePage).subscribe();

Here we animate the pages with the values from the source, skipping all the values that are emitted during the period of animation except the latest.

In practice, what we get is an stream that ignores clicks that are shortly followed by other clicks and are useless to show to the user since they would animate in, and immediately animate out.