Cannot extract/unwrap value from concatenated Observable

1.1k Views Asked by At

I have this chain which should be concatenating 10 Observables into 1 Observable, where each of the 10 Observables should basically just be unwrapped to an integer:

const Rx = require('rxjs');

var i = 0;
const obs = Rx.Observable.interval(10)
  .map(() => i++)
  .map(val => Rx.Observable.create(obs => {
          obs.next(val)
  }))
  .take(10)
  .reduce((prev, curr) => {
      return prev.concat(curr);  // concat all observables
  })
  .last(val => val.flatMap(inner => inner));

// subscribe to Observable
obs.subscribe(v => {
  console.log('\n next (and only) result => \n', v);
});

What's happening is that all 10 Observables should be concatenated together, but I cannot extract the values from those 10 Observables (which have become 1 Observable). So my question is, how can I unwrap that final observable and extract the value?

Anyone know what I am talking about?

3

There are 3 best solutions below

0
On

This question has the exact problem as in Observable.prototype.concatAll does not seem to yield expected result

.map(function(val){  // note: map *not* flatMap
  return Rx.Observable.create(obs => {
      obs.next(val)
  });
})

When creating your own observable using Rx.Observable.create you need to .complete() it yourself. Because you forgot to do so operators like .reduce() will not work because they need to wait for completion before being able to run.

Furthermore, your use of the .last() operator is incorrect; it takes a predicate on which your stream will be filtered and the last emission matching the predicate will be emitted. It is also a bit redundant because your .reduce() will only emit one value. Cleaning it up would lead to:

const obs = Rx.Observable.interval(10)
  .map(() => i++)
  .map(val => Rx.Observable.of(val))
  .take(10)
  .reduce((acc, curr) => acc.concat(curr))
  .flatMap(v => v)
  .toArray()

But you can shorten this by directly using the .concatMap() operator instead of map+take+reduce+flatMap:

const obs = Rx.Observable.interval(10)
  .map(() => i++)
  .concatMap(val => Rx.Observable.of(val))
  .take(10)
  .toArray()
3
On

Instead of reduce + last, you can use concatAll (or concatMap) to concatenate inner sequences and preserve order:

Observable
  .interval(0)
  .map(i => Observable.of(i))
  .take(10)
  .concatAll()
  .takeLast(1)
  .subscribe(v => console.log('\n next (and only) result => \n', v);)

edit: takeLast(1) instead of finalValue()

10
On

Use the non-prototype method.

let range = [...Array(10).keys()];

//array of 10 observables of 10 values each
let ten = range.map(i => Rx.Observable.interval(1000).map(_ => i).take(10));     
let sequence = Rx.Observable.concat(ten) // all of them in sequence