Extracting sub-streams, based on a termination attribute

70 Views Asked by At

I have a stream where the events look something like this:

{ 
  endOfSequence : false,
  sequenceId: 12345,
  data: [.....]
}

I need to terminate the sequence when endOfSequence === true. I started with takeWhile:

seq = stream.takeWhile( function(event){
    return !event.endOfSeq;
});

but the problem is that I miss the last event.

I can obviously write code that accomplishes the same thing, for example:

function beforeEnd(event){
    return !event.endOfSeq;
}
seq = stream.takeWhile(beforeEnd)
            .merge(stream.skipWhile(beforeEnd).take(1));

But this is a bit ugly. Is there a better way?

2

There are 2 best solutions below

0
On BEST ANSWER

You can write a custom handler with Bacon.withHandler that emits a Bacon.End() when you get the endOfSequence.

seq.withHandler(function(event) {
    var ret = this.push(event)
    if(event.hasValue() && event.value().endOfSequence) {
       ret = this.push(new Bacon.End()) 
    }
    return ret
})

A working example can be found in this jsFiddle.

0
On

Accepted answer adapted for for baconjs v3:

takeUntilLastEventSatisfies = (predicate, observable) =>
    observable.transform(
        (baconEvent, sink) => {
            if (!Bacon.hasValue(baconEvent)) {// error or end
                return sink(baconEvent);
            }
                
            if (predicate(baconEvent.value)) {
                sink(baconEvent);
                return sink(new Bacon.End());
            }
                
            return sink(baconEvent);
        })
    );