In RxJs, How do I execute a sequence inside a mergemap while returning the array of results in order?

1.1k Views Asked by At

I have a function that can break down a message into multiple message chunks. I need these messages to be posted in order to my post function. However I do not want the Observable to block other posts that are incoming. My solution would be in some combination of of the concat operator inside a mergemap but I cannot seem to get it to work properly

I am not sure I can make a diagram but here is my attempt:

-1-2------3|->
--4--5--6|->
desired output:
[4,5,6]
[1,2,3]

I need request 1 to execute before 2 before 3 and 4 before 5 and before 6. In English I think I would have an observable of observables and I want that to map into observable streams and then map to a standard array for each observable output stream. I am just not sure how to do this exactly. I've been messing around with the code for a long time trying to conceptualize what I just stated and here is my best attempt:

    interface SendInfo {
        message: discord.Message
        content: string
        options?: discord.MessageOptions
    }
    export const sendMessage$: Subject<SendInfo> = new Subject();

    const regex = /[\s\S]{1,1980}(?:\n|$)/g;
    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | discord.Message[] | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const superObservable: Observable<Observable<discord.Message | discord.Message[] | null>> = concat(chunks.map(
                    (chunk: string):
                    Observable<discord.Message | discord.Message[] | null> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        );
                    }
                ));

                return superObservable.pipe(
                    mergeMap(e => e),
                    toArray(),
                );
            }
        ),
        tap((e): void => Utils.logger.fatal(e)),
        share(),
    );

My output:

[2019-10-21T17:24:15.322] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg1' } ]
[2019-10-21T17:24:15.324] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg2' } ]
[2019-10-21T17:24:15.325] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg3' } ]

I feel like I'm close to a solution but I cannot figure out how to exactly merge this into a single array. I also don't know if it is functionally correct or not.

2

There are 2 best solutions below

0
On BEST ANSWER

I figured out the operator I was looking for was combineAll() and not toArray() after a concat statement. I have another implementation as well with promises. Now I believe both of these should work but I will post the one I'm more sure of first which is the promises.

Implementation one using promises:

    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
                    (chunk: string):
                    Observable<(discord.Message | null)[]> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        // eslint-disable-next-line max-len
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        ).pipe(
                            // eslint-disable-next-line comma-dangle
                            map((x): (discord.Message | null)[] => [x].flatMap(
                                (t): (discord.Message | discord.Message[] | null) => t
                            ))
                        );
                    }
                );

                const promises = observables
                    .map(
                        (obs: Observable<(discord.Message | null)[]>):
                        Promise<(discord.Message | null)[]> => obs.toPromise()
                    );

                const reduced = promises
                    .reduce(async (promiseChain, currentTask):
                    Promise<(discord.Message | null)[]> => [
                        ...await promiseChain,
                        ...await currentTask,
                    ].flatMap((x): (discord.Message | null) => x));

                return from(reduced);
            }
        ),
        share(),
    );

Implementation two pure RxJs:

    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
                    (chunk: string):
                    Observable<(discord.Message | null)[]> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        // eslint-disable-next-line max-len
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        ).pipe(
                            // eslint-disable-next-line comma-dangle
                            map((x): (discord.Message | null)[] => [x].flatMap(
                                (t): (discord.Message | discord.Message[] | null) => t
                            ))
                        );
                    }
                );

                return concat(observables).pipe(
                    combineAll(),
                    map(x => x.flatMap(t => t)),
                );
            }
        ),
        share(),
    );

I believe the promises ones will work because it's reduced into a explicit chain. I am not sure if I am missing some nuance with the concat operator so I'm not 100% sure it will work. The test-bench I wrote is actually not working properly due to a misunderstanding of how promises execute with the defer operator in RxJs but I am getting the expected order according to my bench. I believe my misunderstanding was the reason I didn't come up with these solutions easily.

[2019-10-23T06:09:13.948] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg7' },
  { channel: { send: [Function] }, content: 'msg8' },
  { channel: { send: [Function] }, content: 'msg9' }
]
[2019-10-23T06:09:14.243] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg4' },
  { channel: { send: [Function] }, content: 'msg5' },
  { channel: { send: [Function] }, content: 'msg6' }
]
[2019-10-23T06:09:14.640] [FATAL] messageWrapper.ts:109 - [
  { channel: { send: [Function] }, content: 'msg1' },
  { channel: { send: [Function] }, content: 'msg2' },
  { channel: { send: [Function] }, content: 'msg3' }
]
      ✓ should execute concurrently. (753ms)
9
On

I have tackled the preserving order thing before in this question RxJS: MergeMap with Preserving Input order

So using my parallelExecute you could then reduce the values to an array.

parallelExecute(...yourObservables).pipe(
  reduce((results, item) => [...results, item], [])
);

here is the parallelExecute function.

const { BehaviorSubject, Subject } = rxjs;
const { filter } = rxjs.operators;

const parallelExecute = (...obs$) => {
  const subjects = obs$.map(o$ => {
    const subject$ = new BehaviorSubject();
    const sub = o$.subscribe(o => { subject$.next(o); });
    return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
  });
  const subject$ = new Subject();
  sub(0);
  function sub(index) {
    const current = subjects[index];
    current.obs$.subscribe(c => {
      subject$.next(c);
      current.obs$.complete();
      current.sub.unsubscribe();
      if (index < subjects.length -1) {
        sub(index + 1);
      } else {
        subject$.complete();
      }
    });
  }
  return subject$;
}