Circular dependencies with observables

194 Views Asked by At

I have a number of streams that have circular dependencies.

import input from './input'

const streamA = input.thru( someUsageOf_streamC );
const streamB = input.thru( someOtherUsageOf_streamC );
const streamC = most.merge(
                    streamA.constant('A'),
                    streamB.constant('B'),
                );

The functions passed to .thru(...) take the stream and apply some combination of operators such as .filter(), .map(), .until() but, notably (see below for why), .delay().

These are things I've tried already:

Method 1

const someUsageOf_StreamC = streamX =>
    streamX.sample( (x,c) => [x,c], streamX, streamC )
           .filter( ([x,c]) => c === 'something' )
           .map( ([x,c]) => x );

The problem here is you have to pass the streams to sample upfront but streamC can't be declared before streamA and streamB.

Method 2 (or 1.1)

import { subject } from 'most-subject'

const streamC = subject();

const streamA = ...;
const streamB = ...;
const streamC_proxy = most.merge( /* ...as above... */ );

streamC_proxy.subscribe( streamC );

This method builds on Method 1, but just proxies the events from streamC_proxy into streamC which can be declared beforehand using most-subject.

I think this actually works, but it's not testable using most-test (it requires using its own scheduler but .subscribe()/.observe() implicitly use the defaultScheduler). As I said, I use .delay() and waiting seconds/minutes for tests to run - using the default scheduler - is not feasible.

Method 3

import hold from '@most/hold'

const someUsageOf_StreamC = streamX =>
    streamX.flatMap( x => streamC.take(1) // @most/hold
                                 .filter( c => c === 'something' )
                                 .constant(x) )
           .multicast();

const streamA = ...;
const streamB = ...;
const streamC = most.merge( ... ).thru( hold );

Apart from this code being far from ideal, it should work. But it doesn't :(

Question

Can this be done in a testable manner?

0

There are 0 best solutions below