With Reactive Extensions it is generally suggested to leave Select
to be pure functions, i.e. without any side effects. Then when the side-effect is needed, either place it at the end of the pipeline with a Subscribe
, or as a last resort in the middle of pipeline with a Do
.
Ideally I would go for the last option (Do), but in my current situation, similar to the following block of code, I have a Select
with side-effects, as I need to know the outcome from the operation in order to perform some logic later on. Examples of the outcome could be:
whether the operation completed successfully or failed, in order to show errors to users;
some number crunched by the operation, that is needed in a further observable pipeline.
Anyway, here's a code sample:
IObservable<Unit> inputStream = /* generate stream... */;
IObservable<OperationResult> operationOneStream = inputStream.Select(_ =>
{
try
{
/* side effect: do something, that can also throw... */
return OperationResult.Success; // use a simple Enum
}
catch (Exception ex)
{
/* compensate error: do something ... */
return OperationResult.ErrorInOperationOne;
}
});
// operationOneStream is used to build further streams down the chain, it's not "final".
IObservable<OperationResult> operationTwoStream = operationOneStream
.Where(result => result == OperationResult.Success)
// here in between some more operations: delay, select returning a stream, switch, ...
.Select(_ =>
{
try
{
/* side effect: do something, that can also throw... */
return OperationResult.Success;
}
catch (Exception ex)
{
/* compensate error: do something ... */
return OperationResult.ErrorInOperationTwo;
}
});
// then a third operation, running when nr. 2 is successful
/* ... */
/* Then, operationOne could be merged with other similar operations,
* so to grab their error streams and treat them in a single block.
*/
IObservable<SomeType> allOperationsStream = Observable.Merge(
operationOneStream, operationTwoStream /*, ... */);
IDisposable subscription = allOperationsStream
.Where(result => result != OperationResult.Success)
.Subscribe(result => /* do something with error */);
Now I'd like to understand if there's a way to switch from Do
to Subscribe
in order to be even more explicit about those side-effects. In that case, the Subscribe stops the chain so, in order to grab output from the operation and use that later, I would resort to a Subject and feed that when operation completes or fails. But then again, Subjects are generally discouraged unless really needed.
Is there a more expressive code? Thanks all
EDIT: after various comments, I added more code so to show a more complete example. Also I fixed some errors in exposing the current situation, which I think were confusing.
Not sure I entirely follow what you're after, but possibly you're just trying to chain multiple observables together using a selectmany?
You could also build the error handling into this like so: