RxJs zip operator equivalent in xstream?

242 Views Asked by At

Hello I'm trying to figure out if there is an equivalent to the RxJs operator zip in xstream, or at least a way to get the same behaviour. In case anyone needs clarification on the difference the marble diagrams below will show.

 zip in rxjs
    |---1---2---3-----------5->
    |-a------b------c---d----->
            "zip"
    |-1a----2b------3c-----5d->


 whereas 'combineLatest' aka 'combine' in xstream does 

    |---1---2----------4---5->
    |----a---b---c---d------->
            "combine"
    |-1a----2a-2b-2c-2d-4d-5d>

Any help is appreciated as I'm very new to programming with streams. Thank you in advance!

1

There are 1 best solutions below

0
On

I also needed a zip operator for xstream. So I created my own from existing operators. It takes an arbitrary number of streams for zipping.

function zip(...streams) {
  // Wrap the events on each stream with a label
  // so that we can seperate them into buckets later.
  const streamsLabeled = streams
    .map((stream$, idx) => stream$.map(event => ({label: idx + 1, event: event})));
  return (event$) => {
    // Wrap the events on each stream with a label
    // so that we can seperate them into buckets later.
    const eventLabeled$ = event$.map(event => ({label: 0, event: event}));
    const labeledStreams = [eventLabeled$, ...streamsLabeled];

    // Create the buckets used to store stream events
    const buckets = labeledStreams.map((stream, idx) => idx)
      .reduce((buckets, label) => ({...buckets, [label]: []}), {});

    // Initial value for the fold operation
    const accumulator = {buckets, tuple: []};

    // Merge all the streams together and accumulate them
    return xs.merge(...labeledStreams).fold((acc, event) => {
      // Buffer the events into seperate buckets
      acc.buckets[event.label].push(event);

      // Does the first value of all the buckets have something in it?
      // If so, then there is a complete tuple.
      const tupleComplete = Object.keys(acc.buckets)
        .map(key => acc.buckets[key][0])
        .reduce((hadValue, value) => value !== undefined
          ? true && hadValue
          : false && hadValue,
        true);

      // Save completed tuple and remove it from the buckets
      if (tupleComplete) {
        acc.tuple = [...Object.keys(acc.buckets).map(key => acc.buckets[key][0].event)];
        Object.keys(acc.buckets).map(key => acc.buckets[key].shift());
      } else {
        // Clear tuple since all columns weren't filled
        acc.tuple = [];
      }

      return {...acc};
    }, accumulator)

    // Only emit when we have a complete tuple
    .filter(buffer => buffer.tuple.length !== 0)

    // Just return the complete tuple
    .map(buffer => buffer.tuple);
  };
}

This can be used with compose.

foo$.compose(zip(bar$)).map(([foo, bar]) => doSomething(foo, bar))