rxjs chain can be triggered parallel but internal logic should be synchrounous (.next with concat)

108 Views Asked by At

Fields in my case can be finalized (actions linked to a field were executed). When they are done, I need to update 2 lists:

  • alreadyExecutedFields: string[] --> plain array
  • remainingFieldsToExecute: BehaviorSubject<string[]> --> behavior subject, because a .next needs to trigger other logic. This logic can be triggered parallel, but I want to prevent that because there's a splice within this logic which can behave incorrect then. (splicing an index, that was removed in the parallel chain.)

So when a field needs to be finalized, I call:

this.finalize$.next(field);

And the finalize$ chain looks like this:

this.finalize$.pipe(
        concatMap((field: string) => {
            return new Promise<void>((resolve) => {
                console.log('START', field);
                this.alreadyExecutedFields.push(field);
                const remainingFieldsToExecute = this.remainingFieldsToExecute$.value;
                remainingFieldsToExecute.splice(remainingFieldsToExecute.indexOf(field), 1);
                this.remainingFieldsToExecute$.next(remainingFieldsToExecute);
                console.log('END', field);
                resolve();
            });
        }),
    ).subscribe(() => { });

But for some reason, when 2 finalize$.next calls happen right after each other, the concatMap doesn't await the promise of the previous one. Also when I tried to put a timeout around the END log and the resolve, it doesn't await the previous resolve.

What does work in my case is instead of using a concatMap, using a setInterval with a flag, which locks the part of the code where the lists are being updated.

But how can this be done in a better way? Or in a correct way with or without concat pipes.

1

There are 1 best solutions below

2
shutsman On
  1. to modify alreadyExecutedFields you can use tap operator that used for side-effects
  2. to extract value from remainingFieldsToExecute you can use withLatestFrom
  3. then to modify remainingFieldsToExecute - you can again use tap
  alreadyExecutedFields: string[] = [];
  remainingFieldsToExecute$: BehaviorSubject<string[]> = new BehaviorSubject<
    string[]
  >(['1', '2', '3', '4', '5']);
  finalize$ = new BehaviorSubject('1');

  ngOnInit() {
    this.finalize$
      .pipe(
        tap(field => this.alreadyExecutedFields.push(field)),
        withLatestFrom(this.remainingFieldsToExecute$),
        tap(([field, remainings]) => {
          remainings.splice(remainings.indexOf(field), 1);
          this.remainingFieldsToExecute$.next(remainings);
        })
      )
      .subscribe(([_, data]) => {
        console.group();
        console.log('*** NEW EMIT ***');
        console.log('current field:', _);
        console.log('remainingFieldsToExecute', data);
        console.log('alreadyExecutedFields:', this.alreadyExecutedFields);
        console.groupEnd();
      });

    this.finalize$.next('2');
    this.finalize$.next('3');
  }

demo: https://stackblitz.com/edit/angular-ivy-ensh7w?file=src/app/app.component.ts

logs:

enter image description here

It could be smplified to:


    this.finalize$
      .pipe(
        mergeMap(field =>
          this.remainingFieldsToExecute$.pipe(
            map(remFields => {
              this.alreadyExecutedFields.push(field);
              remFields.splice(remFields.indexOf(field), 1);
              return remFields;
            })
          )
        )
      )
      .subscribe()