How to collect every observable emittion into one object in rxJava2/rxKotlin2?

100 Views Asked by At

Observable emits trade objects. How to make operation on each emission into one result object?

Trade(tradeType: String, profit: BigDecimal) Result(totalProfit: BigDecimal)

Example:

trades
.scan(result: Result, currentTrade: Trade){
  result.totalProfit += currentTrade.profit
}
.subscribe(
 printLn(it.totalProfit)
)
1

There are 1 best solutions below

0
On

Do you have something like this in mind? Please provide a full example next time.

import io.reactivex.rxjava3.core.Flowable
import org.junit.jupiter.api.Test
import java.math.BigDecimal

    class SoScan {
        private val seed : Result = Result(BigDecimal(0.0))
    
        internal data class Result(val total : BigDecimal)
    
        internal data class Trade(val profit : Double)
    
        @Test
        fun scan() {
            Flowable.just(Trade(1.0), Trade(1.0))
                    .scan(seed) { prev, curr ->
                        Result(prev.total.plus(BigDecimal(curr.profit)))
                    }.test()
                    .assertValues(seed, Result(BigDecimal(1.0)), Result(BigDecimal(2.0)))
        }
    }