Converting thunk to sequence upon iteration

80 Views Asked by At

I have a server API that returns a list of things, and does so in chunks of, let's say, 25 items at a time. With every response, we get a list of items, and a "token" that we can use for the following server call to return the next 25, and so on.

Please note that we're using a client library that has been written in stodgy old mutable Java, and doesn't lend itself nicely to all of Scala's functional compositional patterns.

I'm looking for a way to return a lazily evaluated sequence of all server items, by doing a server call with the latest token whenever the local list of items has been exhausted. What I have so far is:

def fetchFromServer(uglyStateObject: StateObject): Seq[Thing] = {
    val results = server.call(uglyStateObject)

    uglyStateObject.update(results.token())

    results.asScala.toList ++ (if results.moreAvailable() then 
        fetchFromServer(uglyStateObject)
    else
        List())
}

However, this function does eager evaluation. What I'm looking for is to have ++ concatenate a "strict sequence" and a "lazy sequence", where a thunk will be used to retrieve the next set of items from the server. In effect, I want something like this:

results.asScala.toList ++ Seq.lazy(() => fetchFromServer(uglyStateObject))

Except I don't know what to use in place of Seq.lazy.

Things I've seen so far:

  • SeqView, but I've seen comments that it shouldn't be used because it re-evaluates all the time?
  • Streams, but they seem like the abstraction is supposed to generate elements at a time, whereas I want to generate a bunch of elements at a time.

What should I use?

2

There are 2 best solutions below

0
On

I also suggest you to take a look at scalaz-strem. Here is small example how it may look like

  import scalaz.stream._
  import scalaz.concurrent.Task

  // Returns updated state + fetched data
  def fetchFromServer(uglyStateObject: StateObject): (StateObject, Seq[Thing]) = ???

  // Initial state
  val init: StateObject = new StateObject

  val p: Process[Task, Thing] = Process.repeatEval[Task, Seq[Thing]] {
    var state = init
    Task(fetchFromServer(state)) map {
      case (s, seq) =>
        state = s
        seq
    }
  } flatMap Process.emitAll
0
On

As a matter of fact, in the meantime I already found a slightly different answer that I find more readable (indeed using Streams):

def fetchFromServer(uglyStateObject: StateObject): Stream[Thing] = {
    val results = server.call(uglyStateObject)

    uglyStateObject.update(results.token())

    results.asScala.toStream #::: (if results.moreAvailable() then 
        fetchFromServer(uglyStateObject)
    else
        Stream.empty)
}

Thanks everyone for