Problems when merging a sampled Observable

95 Views Asked by At

I'm using rx-scala, which is a subproject of rx-java. I'll be using Scala syntax and hope that everyone understands.

I'm encountering odd behavior, and I don't know whether it's a bug or misusage of rx operators on my behalf.

Problem Statement

I have an ox: Observable[X] and a trigger observable tr: Observable[()]. I want an observable oy that is a transformation of ox using function f: Function[X,Y], but only when triggered, since f is potentially expensive.

If there is no transformed value for the last value of ox, then oy should be null.

Some remarks:

  • ox is hot, as it is the result of UI events.
  • ox behaves correctly (both values and timing), as I checked with println debugging.
  • oy fires at the correct times; it's just using outdated values of ox, when its a not-null value.

Current Code

oy = ox.sample(tr).map(f).merge(ox.map(x => null))

The problem with the above code is: It works initally, but after some time, when triggering tr, oy is applying f to old values of ox. When not changing ox, if I trigger tr repeatedly, the results get newer and eventually catch up.

If I remove the merge to not reset to null, then everything works fine (probably, as the effect appears non-deterministic).

Question

My presented code is buggy.

  1. I'd like to know whether I'm doing something wrong.
  2. I welcome alternative ways of achieving what I need.

For the Jave people

  • generics/type annotation: ox: Observable[X] means Observable<X> ox
  • lambdas: x => null means x -> null
1

There are 1 best solutions below

0
On

I'm now using the following solution, which is short and apparently doesn't cause the problem to turn up:

tr.withLatestFrom(ox)((_,x) => f(x)) merge problem.map(_ => null)