Asynchronous call for every item inside a collection

663 Views Asked by At

I have a problem which I was unable to solve so far I'm new to RxKotlin so it might be easy. Please have a look at the code:

    override fun infos(): Stream<Info> =
        client.infoAboutItem(identifier)
                .map {
                    val itemId = it.itemId ?: ""
                    val item = client.itemForId(itemId)
                    ClientInfo(client, it, source, item) as Info
                }
                .let { AccessStream(it) }

Where stream is our self-made collection. Map is a method that allows you to iterate over every item inside that collection.

The problem here is that

 client.itemForId(itemId)

is an http call that returns a Single which is not ideal.

I would want to create an asynchronous call inside map that would return Item instead of Single and then pass it to ClientInfo. The things that I've tried so far was was using subscribing inside the map and using blockingGet() method but this blocks the main thread even if I observe and subscribe on a different thread

So it involves making an asynchronous call for every thing in the collection.

Thanks for help

2

There are 2 best solutions below

2
On

You can try to return Observable<Stream<Info>> and then it would look like:

   override fun infos(): Observable<Stream<Info>> = 
                Observable.from(client.infoAboutItem(identifier))
                        .flatMapSingle {
                            val itemId = it.itemId ?: ""
                            client.itemForId(itemId)
                        }
                        .map { 
                            ClientInfo(client, it, source, item) as Info
                         }
                        .toList()
                        .flatMap {
                            AccessStream(it)
                        }
0
On

You should wrap those expensive operation into a observable and use a flat map to zip those data into a Client Info.

I've written a small sample to show it off.

class SimpleTest {
  val testScheduler = TestScheduler()

  @Test
  fun test() {
    infos().observeOn(Schedulers.immediate())
        .subscribe { logger("Output", it.toString()) }

    testScheduler.advanceTimeBy(10, TimeUnit.MINUTES)
  }

  fun infos(): Single<List<ClientInfo>> {
    return Observable.from(infoAboutItem("some_identifier"))
        .doOnNext { logger("Next", it.toString()) }
        .flatMap { aboutItem ->
          Observable.fromCallable { itemForId(aboutItem.itemId) }
              .subscribeOn(testScheduler)
              .map { ClientInfo(aboutItem = aboutItem, item = it) }
        }
        .doOnNext { logger("Next", it.toString()) }
        .toList()
        .toSingle()
  }

  data class ClientInfo(
      val id: String = UUID.randomUUID().toString(),
      val aboutItem: AboutItem,
      val item: Item
  )

  data class AboutItem(val itemId: String = UUID.randomUUID().toString())
  data class Item(val id: String = UUID.randomUUID().toString())

  fun infoAboutItem(identifier: String): List<AboutItem> {
    return (1..10).map { AboutItem() }
  }

  fun itemForId(itemId: String): Item {
    val sleepTime = Random().nextInt(1000).toLong()
    Thread.sleep(sleepTime)
    return Item()
  }

  fun logger(tag: String, message: String): Unit {
    val formattedDate = Date(Schedulers.immediate().now()).format()
    System.out.println("$tag @ $formattedDate: $message")
  }

  fun Date.format(): String {
    return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this)
  }
}