Sometimes the business logic seems to be able to naturally modeled by some recursive defined observables. Here is one example:
interface Demo {
IObservable<CommandId> userCommands;
IObservable<IObservable<IProcessingState>> processes;
IObservable<CommandId> skippedCommands;
IObservable<(CommandId, CommandResult)> RunCommand(CommandId id);
}
interface IProcessingState {
bool IsProcessing {get;}
CommandId? ProcessingId {get;}
}
For each command user inputs, it should either trigger a running process in prcocess, or emit one value in skippedCommands. Some direct translate of this logic maybe
var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => RunCommand(c))
As code above indicated, assign of validCommands and processes are mutual recursive, we can equivalently define processes directly using itself recursively by
var processes = userCommands.WithLatestFrom(processes)
.Where(x => !x.Item2.IsProcessing)
.Select(c => RunCommand(c))
However we can not define prcesses Observable in C# like this.
I've found several possible related things:
Observable.Generateconstructor. However it seems that it is folding on its own state in a synchronize way, I don't know how to useuserCommandsobservable andRunCommandinObservable.Generate;Some operator like
exhaustorexhaustMapin RxJS, while Rx.Net didn't provide this operator, there are serval 3rd-party libraries provided these operators, like FSharp.Control.Reactive. The implementation is something like
let exhaustMap f source =
Observable.Create (fun (o : IObserver<_>) ->
let mutable hasSubscription = false
let mutable innerSub = None
let onInnerCompleted () =
hasSubscription <- false
innerSub |> Option.iter Disposable.dispose
let onOuterNext x =
if not hasSubscription then
hasSubscription <- true
f x |> subscribeSafeWithCallbacks
o.OnNext o.OnError onInnerCompleted
|> fun y -> innerSub <- Some y
source
|> subscribeSafeWithCallbacks
onOuterNext o.OnError o.OnCompleted)
However, there are two problems. a. directly use this operator does not fit requirements above, skipped commands will be silently ignored. We can modify source code a litter to fit the requirements, but there is still another problem b. the implementation introduced two local mutable variables, and two nested subscription. I don't know if this is ok in all cases (will there be risk of data racing?), and prefer solutions based on composition of operators other than mutable references
SodiumFRP provided forward references types
StreamLoopandCellLoop. And by the Functional Reactive Programming book, Rx alternative for these forward references types would beSubject, by usingSubjectthe recursive construct above is separated into two phases. The problem is by Intro to Rx indicated, usingSubjectrequires manually manage more state, at least dispose of the subject is required, and maybe forced to hot observables. I'm wondering if there exists solutions without usingSubjectUsing
windowoperator with boundary on last value (just before finish) onRunCommandresults, theprocessesabove can some how be constructed, but this solution need to use ending signal twice, which requires careful treatment (quiet a while on trying and tuningTake(1),zip,withLatestFrom,combineLatest, overloads ofWindowoperators to get desired result) on simultaneous events.
Are there better solutions or modification to solutions above to this problem, especially only use operators?
Your types are all weird and hard to work with. Your question is fundamentally a simple state machine with two triggers: 1) New command arrives, 2) Previous command finishes executing.
This should get you started:
sourcehere doesn't have to be a subject, and probably shouldn't be: It can be anyIObservable<ICommand>. It's just easier to mock up in an answer. TheexeuctionTerminatedLoopbackthough does have to be, to (as you put it) break up the recursion into two parts. Because it's a subject, it should be kept private and not leaked out.I don't think there are any answers in C# without using Subject.