Having some trouble achieving what I want with RxJS5 - I have a simple Observables chain, started with Rx.Observable.interval:
const Rx = require('rxjs');
var i = 0;
const obs = Rx.Observable.interval(100)
.flatMap(function () {
return Rx.Observable.timer(Math.ceil(500*Math.random()))
.map(function(val){
console.log(' => These should all log first => ', val);
return i++;
});
})
.take(5)
.merge() // this doesn't seem to do what I want to do
.map(function (val) {
console.log('all done = > ', val);
});
obs.subscribe();
The above logs this:
=> These should all log first => 0
all done = > 0
=> These should all log first => 0
all done = > 1
=> These should all log first => 0
all done = > 2
=> These should all log first => 0
all done = > 3
=> These should all log first => 0
all done = > 4
I am looking to log this:
=> These should all log first => 0
=> These should all log first => 0
=> These should all log first => 0
=> These should all log first => 0
=> These should all log first => 0
all done = > [0,1,2,3,4]
It's clear that we are not waiting for all the timer observables to finish, as you will see "all done!" logged many times, interspersed with "These should all log first".
How can I get the output I am looking for?
Normally, we could use zip
for this, but the API for zip
does not fit this use case, because we don't have all the timer observables in one place at the same time!
If my question was not clear enough, here is the analog of what I want to do, we block on all callbacks until we arbitrarily finish and we have collected all the results:
const async = require('async');
var i = 0;
async.forever(function(cb){
process.nextTick(function(){
console.log('These should all log first');
const err = i++ === 5;
cb(err, i);
});
}, function done(err, results){
// let's pretend results contains all the i values
console.log('all done');
});
So the requirement is:
This does that. It's a little naive, but it does work.