RxJs - why Rx.Observable.fromNodeCallack(...)(...).retry() does not retry on error?

457 Views Asked by At

I was wondering why the following code (in coffeescript) will not retry as expected.

Rx = require 'rx'

count = 0

functToTest = (cb) ->
  console.log "count is", count
  count++
  if count is 1
    cb(new Error('some error'))
  else if count is 2
    cb(null,2)
  else if count is 3
    cb(null,3)
  else
    cb(null,4)


source = Rx.Observable.fromNodeCallback(functToTest)()

onNext = (value) ->
  console.log value

onError = (err) ->
  console.log err

onCompleted = ->
  console.log "done"

retryableSrc = source.retry(3)

retryableSrc.subscribe(onNext, onError, onCompleted)

It will output following messages and quit

count is 0
[Error: some error]

I had thought this is might because fromNodeCallback() return a hot observable. But a test as below show it is NOT.

Rx = require 'rx'

count = 0

functToTest = (cb) ->
  console.log "count is", count
  count++
  if count is 1
    cb(new Error('some error'))
  else if count is 2
    cb(null,2)
  else if count is 3
    cb(null,3)
  else
    cb(null,4)


source = Rx.Observable.fromNodeCallback(functToTest)()

onNext = (value) ->
  console.log value

onError = (err) ->
  console.log err

onCompleted = ->
  console.log "done"

retryableSrc = source.retry(3)

setTimeout ( -> ), 1000

If it was a hot observable, the program above should have printed some "count is 0" message. But in reality the program just waits 1 second and quits.

1

There are 1 best solutions below

1
On BEST ANSWER

It actually is hot, or goes hot when you first subscribe to it.

Inside of fromNodeCallback is Rx.Observable.create(...).publishLast().refCount() meaning that when you first subscribe it will execute the method, print count then emit an error. The error will be caught downstream by retry, which will resubscribe thrice only to received the cached error, which it will finally emit itself.

You can fix it by using flatMap

ncb = Rx.Observable.fromNodeCallback(functToTest);    
source = Rx.Observable.just(ncb).flatMap((fn) -> fn());