I am trying to figure out how to write a rate limiter in rxjs. Used to access most apis (twitter, facebook, etc) If not supported by out of the box methods, i would assume a scheduler could be written. For instance highland.js has ratelimit. I don't want to drop any items like with window, sample, etc.
var source = Rx.Observable.create(function (observer) {
// queue of requests
_.each(requests, function(r) {
observer.onNext(r);
});
observer.onCompleted();
// Any cleanup logic might go here
return function () {
console.log('disposed');
}
})
// what goes here, if built in (e.g. 2 requests per 2 seconds or 15 request per 15 minutes)
// SHOULD ONLY RUN
var subscription = source.subscribe(
function (x) { console.log('onNext: %s', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); });
EDIT 1: Thinking about something like this, using the token bucket algorithm, still really rough but...
Rx.Observable.prototype.tokenBucket = function(options, scheduler) {
function time() {
return new Date().getTime();
}
var BUCKET = {
capacity: options.capacity || Infinity,
left: options.capacity,
last: time(),
tokensPerInterval: options.tokensPerInterval,
interval: options.interval
};
//var BUCKET = _.merge(defaultOptions, options);
console.log(BUCKET);
var source = this,
scheduler = scheduler || (scheduler = Rx.Scheduler.timeout);
return Rx.Observable.create(function(observer) {
var d1 = source.subscribe(function(mainValue) {
return throttle(mainValue);
});
function throttle(x, tokens) {
if (BUCKET.capacity === Infinity) {
return observer.onNext(x);
} // return x;
// the number of tokens to add every S milliseconds = (r*S)/1000.
var self = BUCKET;
var now = time();
var deltaMS = Math.max(now - self.last, 0);
self.last = now;
var dripAmount = deltaMS * (self.tokensPerInterval / self.interval);
self.left = Math.min(self.left + dripAmount, self.capacity);
if (self.left < 1) {
var interval = Math.ceil((1 - self.left) * self.interval);
scheduler.scheduleWithRelative(interval, function (s, i) {
return throttle(x);
});
} else {
self.left -= tokens || 1;
console.log('calling');
return observer.onNext(x);
}
}
return function() {
d1.dispose();
console.log('disposed tokenBucket');
};
});
};
var start = moment();
var source = Rx.Observable.range(1, 20)
.tokenBucket({capacity: 2, tokensPerInterval: 2, interval: 2000})
var subscription = source.subscribe(
function (x) { console.log('onNext: %s', x); addToDom(x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); });
function addToDom(x) {
var ul = document.getElementById('c');
var li = document.createElement('li');
li.innerHTML = x + ' - ' + moment().diff(start, 'seconds') + 's ago';
ul.appendChild(li);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.10.3/moment.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.3/rx.all.js"></script>
<ul id="c"></ul>
If you just want to drop the events that occur in between you can use windowWithTimeOrCount + throttleFirst:
Working example (output in console):
Alternative 1
If you don't want to drop any values you can also use
controlled
on your pipeline along with a specially rolledregulate
method:Working example (output in console):