Wait for chained observables to complete

1.9k Views Asked by At

Having some trouble achieving what I want with RxJS5 - I have a simple Observables chain, started with Rx.Observable.interval:

const Rx = require('rxjs');

var i = 0;

const obs = Rx.Observable.interval(100)
    .flatMap(function () {
        return Rx.Observable.timer(Math.ceil(500*Math.random()))
            .map(function(val){
                console.log(' => These should all log first => ', val);
                return i++;
            });
    })
    .take(5)
    .merge()  // this doesn't seem to do what I want to do
    .map(function (val) {
        console.log('all done = > ', val);
    });

obs.subscribe();

The above logs this:

 => These should all log first =>  0
all done = >  0
 => These should all log first =>  0
all done = >  1
 => These should all log first =>  0
all done = >  2
 => These should all log first =>  0
all done = >  3
 => These should all log first =>  0
all done = >  4

I am looking to log this:

 => These should all log first =>  0
 => These should all log first =>  0
 => These should all log first =>  0
 => These should all log first =>  0
 => These should all log first =>  0

all done = >  [0,1,2,3,4]

It's clear that we are not waiting for all the timer observables to finish, as you will see "all done!" logged many times, interspersed with "These should all log first".

How can I get the output I am looking for?

Normally, we could use zip for this, but the API for zip does not fit this use case, because we don't have all the timer observables in one place at the same time!

If my question was not clear enough, here is the analog of what I want to do, we block on all callbacks until we arbitrarily finish and we have collected all the results:

const async = require('async');
var i = 0;

async.forever(function(cb){

    process.nextTick(function(){
       console.log('These should all log first');
       const err = i++ === 5;
       cb(err, i);
    });

}, function done(err, results){
    // let's pretend results contains all the i values
    console.log('all done');
});
3

There are 3 best solutions below

14
On

So the requirement is:

  • Let the top level interval run N times.
  • Each interval should return an array of Y timer observables.
  • Next should trace out N times, each time with an array of Y observables.

This does that. It's a little naive, but it does work.

    let timerArrLength = [ 4, 2, 3 ];
    let svc = Rx.Observable.interval(1000)
        .take(timerArrLength.length)
        .map(function ( index ) {
            let arr = [];
            for ( let i = 0; i < timerArrLength [ index ]; i++ ) {
                arr.push ( Rx.Observable.timer ( 1000 ) );
            }
            return arr;
        });


        svc.subscribe(
            function onNext(v){
                console.log('=> v =>',v);
            },
            function onError(e){
                console.error(e);
            },
            function onComplete(){
                console.log('complete');
            }
        );
2
On

Note that this basically yields the logging order that I was expecting to see, but I don't think this is the right/best way to do it:

   const {Observable} = require('rxjs');

    const obs = Observable.interval(100)
        .flatMap(function () {
          return Observable.timer(Math.ceil(500*Math.random()))
              .map(function(val){
                  console.log(' => These should all log first => ', val);
              });
        })
        .take(5)
        .takeLast()  // <<<<<<<<<<
        .map(function () {
            console.log('all done');
        })
        .take(1)    // <<<<<<<<<<

   obs.subscribe();

I think there is yet a better way to achieve this.

7
On

@jonsharpe gave me this answer, which basically works. The problem can really be distilled down to just Rx.Observable.interval, we can get rid of the Rx.Observable.timer mapping. Here's our basic answer:

const Rx = require('rxjs');

const obs = Rx.Observable.interval(100)
    .take(5)
    .map(function(v){
        console.log(v);
        return v;
    })
    .reduce(function (prev, curr) {
        return prev.concat(curr);
    },[])
    .last(function (results) {
        return results;
    })
    .map(function(v){
        console.log(v);
    });


obs.subscribe();

I would be very interested, however, in a way to do this without reduce.