rxjs mongo stream backpressure

531 Views Asked by At

Lately I've been having backpressure-problems with rx.js from a mongo-cursor-readable. The task is basically:

  1. Query DB A (Mongo), use its readable and convert it to an observable
  2. Perform various async transformations to the input stream, e.g.

    2.1 Query another DB B (Mongo, whatever) for additional data for an item (or a batch of items), then perform (possibly async) operations on it

    2.2 Query another DB B (Mongo, whatever) for an item (or a batch of items), then perform filtering with that additional data

  3. Count the Input stream and compare it to the output stream (as filtering can occur)

Although I do have a solution for all of the problems, I'm dealing with backpressure from the very first Mongo-DB, as 2. or 2.1/2.2 can be potentially very time intensive, but the initial query is much more performant (a.k.a consumers are slow, producers are fast)

Right now, my only solution is to limit 1. to a reasonable amount to reduce initial throughput, but this results obviously in slower throughput for the whole transformation-chain (long waiting periods for the producer until the whole chain is done working)

How do I realize a transformation chain, which is always busy, but doesn't suffer from backpressure?

I'm thinking here to reduce the speed of the producer dynamically based on the current (or projected) performance of its consumers (e.g. time taken for a transformation), but this doesn't seem very intuitive to me.

0

There are 0 best solutions below