I'm trying to return an Observable that is created asynchronously in a callback:
const mkAsync = (observer, delay) =>
setTimeout(() => Observable.of('some result').subscribe(observer), delay)
const create = arg => {
const ret = new Subject()
mkAsync(ret, arg)
return ret
}
Therefore I use a Subject as a unicast proxy which is subscribed to the underlying Observable in the callback.
The problem I have with this solution is that when I unsubscribe from the Subject's subsrciption the unsubscribe isn't forwarded to the underlying Observable. Looks like I need some type of refcounting to make the Subject unsubscribe when there are no more subscribers, but I wasn't able to figure it out when using it in this kind of imperative callback style.
I have to keep the mkAsync a void and am looking for an alternative implementation.
Is that the right way to do it? Is there an alternative solution to using a Subject?
How do I make sure that the created Observable is cancelled (unsubscribe is called on the Subscription) when the Subject is unsubscribed from?
This is pretty broad question and it's hard to tell what are you trying to achieve with this. I have two ideas:
The first thing is that there is
refCount()operator that exists only onConnectableObservableclass that is returned frommulticast(orpublish) depending on the parameters you pass. See implementation for more details (basically if you don't set anyselectorfunction): https://github.com/ReactiveX/rxjs/blob/5.5.11/src/operators/multicast.tsThe second issue I can think of is that you're doing basically this:
The problem with this is that
.ofwill emit immediately it'snextitems and then it sends thecompletenotification. Subjects have internal state and whenSubjectreceives thecompletenotification it marks itself asstoppedand it will never ever emit anything.I'm suspicious that's what's happening to you. Even when you return the Subject instance with
return retand later probably subscribe to it you still won't receive anything because this Subject has already received thecompletenotification.