I'm trying to return an Observable
that is created asynchronously in a callback:
const mkAsync = (observer, delay) =>
setTimeout(() => Observable.of('some result').subscribe(observer), delay)
const create = arg => {
const ret = new Subject()
mkAsync(ret, arg)
return ret
}
Therefore I use a Subject
as a unicast proxy which is subscribed to the underlying Observable
in the callback.
The problem I have with this solution is that when I unsubscribe from the Subject
's subsrciption the unsubscribe isn't forwarded to the underlying Observable
. Looks like I need some type of refcounting to make the Subject
unsubscribe when there are no more subscribers, but I wasn't able to figure it out when using it in this kind of imperative callback style.
I have to keep the mkAsync
a void and am looking for an alternative implementation.
Is that the right way to do it? Is there an alternative solution to using a Subject
?
How do I make sure that the created Observable
is cancelled (unsubscribe
is called on the Subscription
) when the Subject is unsubscribed from?
This is pretty broad question and it's hard to tell what are you trying to achieve with this. I have two ideas:
The first thing is that there is
refCount()
operator that exists only onConnectableObservable
class that is returned frommulticast
(orpublish
) depending on the parameters you pass. See implementation for more details (basically if you don't set anyselector
function): https://github.com/ReactiveX/rxjs/blob/5.5.11/src/operators/multicast.tsThe second issue I can think of is that you're doing basically this:
The problem with this is that
.of
will emit immediately it'snext
items and then it sends thecomplete
notification. Subjects have internal state and whenSubject
receives thecomplete
notification it marks itself asstopped
and it will never ever emit anything.I'm suspicious that's what's happening to you. Even when you return the Subject instance with
return ret
and later probably subscribe to it you still won't receive anything because this Subject has already received thecomplete
notification.