Proxy an Observable and connect it in callback

567 Views Asked by At

I'm trying to return an Observable that is created asynchronously in a callback:

const mkAsync = (observer, delay) =>
  setTimeout(() => Observable.of('some result').subscribe(observer), delay)

const create = arg => {
  const ret = new Subject()
  mkAsync(ret, arg)
  return ret
}

Therefore I use a Subject as a unicast proxy which is subscribed to the underlying Observable in the callback.

The problem I have with this solution is that when I unsubscribe from the Subject's subsrciption the unsubscribe isn't forwarded to the underlying Observable. Looks like I need some type of refcounting to make the Subject unsubscribe when there are no more subscribers, but I wasn't able to figure it out when using it in this kind of imperative callback style.

I have to keep the mkAsync a void and am looking for an alternative implementation.

Is that the right way to do it? Is there an alternative solution to using a Subject?

How do I make sure that the created Observable is cancelled (unsubscribe is called on the Subscription) when the Subject is unsubscribed from?

1

There are 1 best solutions below

1
On

This is pretty broad question and it's hard to tell what are you trying to achieve with this. I have two ideas:

The first thing is that there is refCount() operator that exists only on ConnectableObservable class that is returned from multicast (or publish) depending on the parameters you pass. See implementation for more details (basically if you don't set any selector function): https://github.com/ReactiveX/rxjs/blob/5.5.11/src/operators/multicast.ts

The second issue I can think of is that you're doing basically this:

const ret = new Subject()
Observable.of(...).subscribe(ret);

The problem with this is that .of will emit immediately it's next items and then it sends the complete notification. Subjects have internal state and when Subject receives the complete notification it marks itself as stopped and it will never ever emit anything.

I'm suspicious that's what's happening to you. Even when you return the Subject instance with return ret and later probably subscribe to it you still won't receive anything because this Subject has already received the complete notification.