How to order stream merges?

68 Views Asked by At

I would like to merge two maps of the same source guaranteeing the order of the result. Here is a unit test that I would like to pass:

  const source = xs.of(1,2,3)
  const a = source.map(v=>v*10)
  const b = source.map(v=>v*100)
  const hist:number[] = []
  xs.merge(a,b).addListener({
    next: v=>hist.push(v),
  })
  expect(hist).toEqual([10,100,20,200,30,300])

Currently the result I'm getting is this:

Expected value to equal:
  [10, 100, 20, 200, 30, 300]
Received:
  [10, 20, 30, 100, 200, 300]
2

There are 2 best solutions below

1
On

Taking a clue from user3743222's answer, using periodic to sequence the list.

Changing:

//const source = xs.of(1,2,3)
const source = xs.periodic(1).drop(1).take(3)

...produces in the console:

10 100 20 200 30 300

ESNextbin demo.

drop(1) is one way to deal with periodic starting with 0. One can alternatively use ++v*10 in the map function.

1
On

I am no expert with xstream, so I can't propose you a solution. However, I think I can explain why you are getting the output you are getting as this is a common occurrence in other streaming libraries.

You have a merge of two sources. The of operator guarantees that it will emit the array values in order, the map operator guarantees that it will emits the transformed values in the same order as the received values, etc. But merge(a,b) does not guarantee that it will interleave value of a and b. It does guarantee that it will pass on the values of a in order, those of b in order, i.e. it guarantees only a partial order on the resulting output.

The question of, given some values to emit, which ones to emit at what time, and in which order relates to scheduling. I am not aware of xstream at this point of time exposing a scheduler interface , from which you could customize the scheduling of emission of values. Hence you are bound to the default scheduling.

Now back to why you observe those values in that order :

  • merge(a,b) connects first to a
    • a emits immediately and synchronously all the values from the array
  • merge(a,b) then connects to b
    • b emits immediately and synchronously all the values from the array

If you want to get the values 1, 2, 3 not emitted synchronously, you need to use another operator than of to do so, or construct your timed sequence explicitly, i.e. emit 1 at t0, 2 at t0+1, 3 at t0+2.