I am having trouble figuring out how to access the source observable, in this scheme (just trying to figure out how to this without modifying Rx.Observable.prototype):
q.drain()
.flatMap(function(val){
return q.backpressure(val, function(cb){
setTimeout(cb,1000);
});
})
We call backpressure as a method on the Queue prototype:
Queue.prototype.backpressure = function(val, fn){
const source = ? // I don't know how to access the source observable...
return Rx.Observable.create(sub => {
return source.subscribe(val => {
fn.call(source, val, function(err, val){
if(err){
sub.error(err);
}
else{
sub.next(val);
}
});
},
// be sure to handle errors and completions as appropriate and
// send them along
err => sub.error(err),
() => sub.complete());
});
};
but the problem is I don't know if I can access the source observable in this scheme - the correct value for source is certainly not the this
value inside the prototype because that belongs to the queue instance. My only hope I think is somehow to pass the source observable directly into the backpressure method. Anyone know how I can this? I don't mind putting this function elsewhere, it doesn't have to be a method on queue, but I think the same problem will exist either way.
If it helps, the value for this
inside the flatMap function (if you use a regular function instead of an arrow function) is a MergeMapSubcriber object, see:
However, after experimenting, I don't believe that the MergeMapSubcriber value is the one I want to use as my source; my source should be an Observable TMK, not a Subscriber.
Have you thought about putting it on
Observable
prototype?Then for queue: