I am trying to create a custom observable operator, "write", that will chain to an observable stream and output the contents of observable stream.
The following code works:
RX.Observable.write = function() {
return RX.Observable.create(function (observer) {
try{
observer.onNext("this is a test message");
observer.onCompleted();
} catch(exception){
observer.onError(exception);
}
});
};
var observable = RX.Observable.write();
var subscription = observable.subscribe( function (x) {
console.log(x);
} );
But the following code, with added "range(0, 5)" does not work and kicks out an exception:
var observable = RX.Observable.range(0,5).write();
^
TypeError: Object #<RangeObservable> has no method 'write'
at Object.<anonymous> (/Users/ericbroda/Development/rxstream/index.js:15:43)
at Module._compile (module.js:456:26)
at Object.Module._extensions..js (module.js:474:10)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)
at Function.Module.runMain (module.js:497:10)
at startup (node.js:119:16)
at node.js:906:3
There is probably a simple thing that I am missing but it escapes me right now.
Any help is appreciated.
For completeness, I wanted to post the final solution I worked out -- I now can write an observable to a node stream using the RX observable pattern:
The above "writable" observable can be used as follows below. The example is illustrated using a node app writing the observable stream to node's stdout for illustration purpose but any stream such as a file can be used (note: you will need to address encoding in the appropriate fashion). This makes it very simple to write observables using the regular RX observable pattern.