I have been trying to get stream of objects in a sequencial order, however concatMap is not working for me. With mergeMap I get the stream but no in a sequencial order.
Below is my code I am trying:
this.result
.pipe(
map((result: Result) => {
return result.contents
.map((content: Content) => this.databaseService
.getResource<Resource>('Type', content.key) // http call in service
)
}),
mergeMap((requests: Observable<Resource>[]) => {
return from(requests)
.pipe(
concatMap((resource: Observable<Resource>) => resource), // ----> Trigger only once.
filter((resource:Resource) => resource.status === 'active'),
skip(this.pageIndex * this.PAGE_SIZE),
take(this.PAGE_SIZE),
)
}),
)
.subscribe({
next: (resource: Resource) => {
console.log(resource) // resource stream
},
complete: () => console.log('completed')
});
concatMapwill only process one observable source at a time. It will not process subsequent sources until the current source completes.In your case the observable from
this.databaseService.getResource()is not completing. To force it to complete after the first emission, you could append atake(1)like this:Alternatively, if the call to
databaseService.getResource()is only meant to emit a single value, you could modify your service to emit only a single value.Note: If you are using the Angular HttpClient, a typical get request will complete when the response is received. So you can probably have your
getResource()method return the oservable from the http.get() call.