Custom observable not able to be chained

480 Views Asked by At

I am trying to create a custom observable operator, "write", that will chain to an observable stream and output the contents of observable stream.

The following code works:

RX.Observable.write = function() {
    return RX.Observable.create(function (observer) {
        try{
            observer.onNext("this is a test message");
            observer.onCompleted();
        } catch(exception){
            observer.onError(exception);
        }
    });
};

var observable = RX.Observable.write(); 
var subscription = observable.subscribe( function (x) { 
    console.log(x);
} );

But the following code, with added "range(0, 5)" does not work and kicks out an exception:

var observable = RX.Observable.range(0,5).write();  
                                          ^
TypeError: Object #<RangeObservable> has no method 'write'
    at Object.<anonymous> (/Users/ericbroda/Development/rxstream/index.js:15:43)
    at Module._compile (module.js:456:26)
    at Object.Module._extensions..js (module.js:474:10)
    at Module.load (module.js:356:32)
    at Function.Module._load (module.js:312:12)
    at Function.Module.runMain (module.js:497:10)
    at startup (node.js:119:16)
    at node.js:906:3

There is probably a simple thing that I am missing but it escapes me right now.

Any help is appreciated.

2

There are 2 best solutions below

0
On BEST ANSWER

For completeness, I wanted to post the final solution I worked out -- I now can write an observable to a node stream using the RX observable pattern:

var RX = require("rx");
var util = require('util');

RX.Observable.prototype.write = function(stream, encoding, cb) {

    var source = this;

    return new RX.AnonymousObservable(function(observer) {

        return source.subscribe (
            function(x) { 
                observer.onNext(x);
                stream.write(x, encoding, cb(data));
            },
            function(e) {
                observer.onError(e);
            },
            function() {
                observer.onCompleted();
            }, 
            source);

        return function() {
            // console.log('disposed');
        };
    });
};

The above "writable" observable can be used as follows below. The example is illustrated using a node app writing the observable stream to node's stdout for illustration purpose but any stream such as a file can be used (note: you will need to address encoding in the appropriate fashion). This makes it very simple to write observables using the regular RX observable pattern.

var RX = require("rx");
var util = require('util');

var observable = RX.Observable.range(0,5).write(process.stdout, 'utf8', function(x){
    console.log("stream completion callback called, x: %s", x);
});

var subscription = observable.subscribe( function (x) { 
    console.dir(x);
} );
0
On

You are placing write on the Rx.Observable object, essentially creating a static method. If you want it to work in the middle of a chain you need to add it to the prototype 'Rx.Observable.prototype.write'.

Even then though, this will probably not behave as you want. It'll disregard the incoming RangeObservable and simply return "this is a test message" once then complete. So in the instance method you will need to handle the incoming Observable, i.e. this.

Rx.Observable.prototype.write = function() {
  var source = this;
  return source.map(/*Do something with incoming data*/);
};