I have a number of streams that have circular dependencies.
import input from './input'
const streamA = input.thru( someUsageOf_streamC );
const streamB = input.thru( someOtherUsageOf_streamC );
const streamC = most.merge(
streamA.constant('A'),
streamB.constant('B'),
);
The functions passed to .thru(...)
take the stream and apply some combination of operators such as .filter()
, .map()
, .until()
but, notably (see below for why), .delay()
.
These are things I've tried already:
Method 1
const someUsageOf_StreamC = streamX =>
streamX.sample( (x,c) => [x,c], streamX, streamC )
.filter( ([x,c]) => c === 'something' )
.map( ([x,c]) => x );
The problem here is you have to pass the streams to sample upfront but streamC
can't be declared before streamA
and streamB
.
Method 2 (or 1.1)
import { subject } from 'most-subject'
const streamC = subject();
const streamA = ...;
const streamB = ...;
const streamC_proxy = most.merge( /* ...as above... */ );
streamC_proxy.subscribe( streamC );
This method builds on Method 1, but just proxies the events from streamC_proxy
into streamC
which can be declared beforehand using most-subject.
I think this actually works, but it's not testable using most-test (it requires using its own scheduler but .subscribe()
/.observe()
implicitly use the defaultScheduler
). As I said, I use .delay()
and waiting seconds/minutes for tests to run - using the default scheduler - is not feasible.
Method 3
import hold from '@most/hold'
const someUsageOf_StreamC = streamX =>
streamX.flatMap( x => streamC.take(1) // @most/hold
.filter( c => c === 'something' )
.constant(x) )
.multicast();
const streamA = ...;
const streamB = ...;
const streamC = most.merge( ... ).thru( hold );
Apart from this code being far from ideal, it should work. But it doesn't :(
Question
Can this be done in a testable manner?