I have this chain which should be concatenating 10 Observables into 1 Observable, where each of the 10 Observables should basically just be unwrapped to an integer:
const Rx = require('rxjs');
var i = 0;
const obs = Rx.Observable.interval(10)
.map(() => i++)
.map(val => Rx.Observable.create(obs => {
obs.next(val)
}))
.take(10)
.reduce((prev, curr) => {
return prev.concat(curr); // concat all observables
})
.last(val => val.flatMap(inner => inner));
// subscribe to Observable
obs.subscribe(v => {
console.log('\n next (and only) result => \n', v);
});
What's happening is that all 10 Observables should be concatenated together, but I cannot extract the values from those 10 Observables (which have become 1 Observable). So my question is, how can I unwrap that final observable and extract the value?
Anyone know what I am talking about?
This question has the exact problem as in Observable.prototype.concatAll does not seem to yield expected result
When creating your own observable using
Rx.Observable.create
you need to.complete()
it yourself. Because you forgot to do so operators like.reduce()
will not work because they need to wait for completion before being able to run.Furthermore, your use of the
.last()
operator is incorrect; it takes a predicate on which your stream will be filtered and the last emission matching the predicate will be emitted. It is also a bit redundant because your.reduce()
will only emit one value. Cleaning it up would lead to:But you can shorten this by directly using the
.concatMap()
operator instead of map+take+reduce+flatMap: