Create Observable from side-effects in Do block

226 Views Asked by At

With Reactive Extensions it is generally suggested to leave Select to be pure functions, i.e. without any side effects. Then when the side-effect is needed, either place it at the end of the pipeline with a Subscribe, or as a last resort in the middle of pipeline with a Do.

Ideally I would go for the last option (Do), but in my current situation, similar to the following block of code, I have a Select with side-effects, as I need to know the outcome from the operation in order to perform some logic later on. Examples of the outcome could be:

  • whether the operation completed successfully or failed, in order to show errors to users;

  • some number crunched by the operation, that is needed in a further observable pipeline.

Anyway, here's a code sample:

IObservable<Unit> inputStream = /* generate stream... */;

IObservable<OperationResult> operationOneStream = inputStream.Select(_ => 
{
  try 
  {
    /* side effect: do something, that can also throw... */

    return OperationResult.Success;  // use a simple Enum
  }
  catch (Exception ex)
  {
    /* compensate error: do something ... */

    return OperationResult.ErrorInOperationOne;
  }
});

// operationOneStream is used to build further streams down the chain, it's not "final".

IObservable<OperationResult> operationTwoStream = operationOneStream
  .Where(result => result == OperationResult.Success)
  // here in between some more operations: delay, select returning a stream, switch, ...
  .Select(_ => 
  {
    try 
    {
      /* side effect: do something, that can also throw... */

      return OperationResult.Success;
    }
    catch (Exception ex)
    {
      /* compensate error: do something ... */

      return OperationResult.ErrorInOperationTwo;
    }
  });

// then a third operation, running when nr. 2 is successful

/* ... */

/* Then, operationOne could be merged with other similar operations, 
 * so to grab their error streams and treat them in a single block.
*/
IObservable<SomeType> allOperationsStream = Observable.Merge(
  operationOneStream, operationTwoStream /*, ... */);

IDisposable subscription = allOperationsStream
  .Where(result => result != OperationResult.Success)
  .Subscribe(result => /* do something with error */);

Now I'd like to understand if there's a way to switch from Do to Subscribe in order to be even more explicit about those side-effects. In that case, the Subscribe stops the chain so, in order to grab output from the operation and use that later, I would resort to a Subject and feed that when operation completes or fails. But then again, Subjects are generally discouraged unless really needed.

Is there a more expressive code? Thanks all

EDIT: after various comments, I added more code so to show a more complete example. Also I fixed some errors in exposing the current situation, which I think were confusing.

1

There are 1 best solutions below

6
On

Not sure I entirely follow what you're after, but possibly you're just trying to chain multiple observables together using a selectmany?

(from a in Observable.Return(1)
 from b in Observable.Return(2)
 select b
).Subscribe(x => {}, ex => {})

You could also build the error handling into this like so:

(from a in Observable.Return(1)
 from result in Observable.Return(2)
                          .Select(x => OperationResult.Success)
                          .Catch<OperationResult, Exception>(ex => { return Observable.Return(OperationResult.Error);})
  select result)
  .Subscribe(x => {}, ex => {});