RxJS: execute concatMap i parallel

470 Views Asked by At

Is it possible to execute a high-order observable in parallel, but still preserve the order when merging the results?

I have something looking like this:

invoker$: Observable<void>;
fetch: (index: number) => Observable<T[]>;

invoker$
  .pipe(
    concatMap((_, index) => fetch(index)),
    scan((acc: T[], values) => [...acc, ...values], [])
  )
  .subscribe(/* Do something with the array */);

The idea is having an observable that invokes a callback (e.g. backend call that takes a considerable amount of time) generating a new observable that emits a single value (array of some generic type). The returned values should be concatenated in another array while preserve their original fetch order.

I would, however, like the requests to be fired in parallel. So if the invoker$ is called rapidly, the requests are made in parallel and the results are merged as they complete.

My understanding is that the concatMap will wait for one observable to complete, before starting the next one. mergeMap will do it parallel, but won't do anything to preserve the order.

3

There are 3 best solutions below

0
On BEST ANSWER

Seems like this behavior is provided by the concatMapEager operator from the cartant/rxjs-etc library - written by Nicholas Jamieson (cartant) who's a developer on the core RxJS team.

3
On

I believe that the operator you are looking for is forkJoin. This operator will take as input a list of observables, fire them in parallel and will return a list of the last emitted value of each observable once they all complete.

forkJoin({
   invoker: invoker$,
   fetch: fetch$,
})
.subscribe(({invoker, fetch}) => {
   console.log(invoker, fetch);
});
0
On

You can do it using mergeMap.

First, you need to pass the index together with the async response down the stream. Then you can sort based on the index from the previous step. Then you have two choices:

I am going to give you some pseudo-code for both examples:

  1. In the reduce case, the stream ends once all requests are sent:

     invoker$
      .pipe(
       mergeMap((_, index) => fetch(index).then(value => {value, index})),
       reduce((acc: T[], singleValue) => [...acc, ...singleValue], []),
       map(array => array.sort(/*Sort on index here*/).map(valueWithIndex => valueWithIndex.value))
      )
      .subscribe(/* Do something with the array */);
    
  2. In the multiple-use case, I am assuming the size of the batch to be constant:

     invoker$
      .pipe(
       mergeMap((_, index) => fetch(index).then(value => {value, index})),
       scan((acc: T[], singleValue) => {
           let resp = [...acc, ...singleValue];
           // The scan can accumulate more than the batch size,
           // so we need to limit it and restart for the new batch
           if(resp.length > BATCH_SIZE) {
              resp = [singleValue];
           }
    
           return resp;
       }, []),
       filter(array => array.length == BATCH_SIZE),
       map(array => 
          array
             .sort(/*Sort on index here*/)
             .map(valueWithIndex => valueWithIndex.value))
      )
      .subscribe(/* Do something with the array */);
    

2.1. In case the batch size is dynamic:

    invoker$
     .pipe(
      mergeMap((_, index) => fetch(index).then(value => {value, index})),
      withLatestFrom(batchSizeStream),
      scan((acc: [T[], number], [singleValue, batchSize]) => {
          let resp = [[...acc[0], ...singleValue], batchSize];
          // The scan can accumulate more than the batch size,
          // so we need to limit it and restart for the new batch
          // NOTE: the batch size is dynamic and we do not want to drop data
          // once the buffer size changes, so we need to drop the buffer 
          // only if the batch size did not change
          if(resp[0].length > batchSize && acc[1] == batchSize) {
             resp = [[singleValue], batchSize];
          }

          return resp;
      }, [[],0]),
      filter(arrayWithBatchSize => 
        arrayWithBatchSize[0].length >= arrayWithBatchSize[1]),
      map(arrayWithBatchSize => 
        arrayWithBatchSize[0]
           .sort(/*Sort on index here*/)
           .map(valueWithIndex => valueWithIndex.value))
     )
     .subscribe(/* Do something with the array */);

EDIT: optimized sorting, added dynamic batch size case